diff options
author | Valery Piashchynski <[email protected]> | 2020-12-26 13:13:59 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-26 13:13:59 +0300 |
commit | 1aaf6e6ffb015cd5a21d9d938ad84c18723973c5 (patch) | |
tree | 7221427ba25f8f99a10720778a420703dc2bff92 | |
parent | de4d0b6e66ceda08b0daec18a218c1baa71ebf04 (diff) |
Reorganize eventsv2.0.0-beta4
-rwxr-xr-x | Makefile | 5 | ||||
-rw-r--r-- | interfaces/events/handler.go | 8 | ||||
-rw-r--r-- | interfaces/events/pool_events.go | 8 | ||||
-rw-r--r-- | interfaces/events/worker_events.go | 9 | ||||
-rw-r--r-- | interfaces/worker/factory.go | 4 | ||||
-rwxr-xr-x | pkg/events/events.go | 23 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory.go | 4 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 6 | ||||
-rwxr-xr-x | pkg/socket/socket_factory.go | 4 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 4 | ||||
-rw-r--r-- | plugins/http/handler.go | 6 | ||||
-rw-r--r-- | plugins/server/interface.go | 4 | ||||
-rw-r--r-- | plugins/server/plugin.go | 8 |
13 files changed, 49 insertions, 44 deletions
@@ -12,7 +12,7 @@ help: ## Show this help @awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z0-9_-]+:.*?## / {printf " \033[32m%-14s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) build: ## Build RR binary file for local os/arch - CGO_ENABLED=0 go build -trimpath -ldflags "-s" -o ./rr ./cmd/rr/main.go + CGO_ENABLED=0 go build -trimpath -ldflags "-s" -o ./rr ./cmd/main.go clean: ## Make some clean rm ./rr @@ -46,5 +46,4 @@ test: ## Run application tests go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/static lint: ## Run application linters - go fmt ./... - golint ./... + golangci-lint run diff --git a/interfaces/events/handler.go b/interfaces/events/handler.go index 01f64d73..ac6c15a4 100644 --- a/interfaces/events/handler.go +++ b/interfaces/events/handler.go @@ -1,10 +1,14 @@ package events +// Handler interface type Handler interface { + // Return number of active listeners NumListeners() int - AddListener(listener EventListener) + // AddListener adds lister to the publisher + AddListener(listener Listener) + // Push pushes event to the listeners Push(e interface{}) } // Event listener listens for the events produced by worker, worker pool or other service. -type EventListener func(event interface{}) +type Listener func(event interface{}) diff --git a/interfaces/events/pool_events.go b/interfaces/events/pool_events.go index cc32f6b2..d1464e1a 100644 --- a/interfaces/events/pool_events.go +++ b/interfaces/events/pool_events.go @@ -16,16 +16,16 @@ const ( // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed EventNoFreeWorkers - // todo: EventMaxMemory caused when worker consumes more memory than allowed. + // EventMaxMemory caused when worker consumes more memory than allowed. EventMaxMemory - // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError + // EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError EventTTL - // todo: EventIdleTTL triggered when worker spends too much time at rest. + // EventIdleTTL triggered when worker spends too much time at rest. EventIdleTTL - // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). EventExecTTL ) diff --git a/interfaces/events/worker_events.go b/interfaces/events/worker_events.go index 497f0a06..2bff1811 100644 --- a/interfaces/events/worker_events.go +++ b/interfaces/events/worker_events.go @@ -1,17 +1,16 @@ package events -// EventWorkerKill thrown after WorkerProcess is being forcefully killed. const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError E = iota + 200 + EventWorkerError W = iota + 200 // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog ) -type E int64 +type W int64 -func (ev E) String() string { +func (ev W) String() string { switch ev { case EventWorkerError: return "EventWorkerError" @@ -24,7 +23,7 @@ func (ev E) String() string { // WorkerEvent wraps worker events. type WorkerEvent struct { // Event id, see below. - Event E + Event W // Worker triggered the event. Worker interface{} diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go index 8412428d..376303df 100644 --- a/interfaces/worker/factory.go +++ b/interfaces/worker/factory.go @@ -11,10 +11,10 @@ import ( type Factory interface { // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. // Process must not be started. - SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.EventListener) (BaseProcess, error) + SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (BaseProcess, error) // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. - SpawnWorker(*exec.Cmd, ...events.EventListener) (BaseProcess, error) + SpawnWorker(*exec.Cmd, ...events.Listener) (BaseProcess, error) // Close the factory and underlying connections. Close() error } diff --git a/pkg/events/events.go b/pkg/events/events.go index 92dc103a..226a0c91 100755 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -6,32 +6,35 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/events" ) -// EventHandler helps to broadcast events to multiple listeners. -type EventHandler struct { - listeners []events.EventListener - sync.RWMutex +// HandlerImpl helps to broadcast events to multiple listeners. +type HandlerImpl struct { + listeners []events.Listener + sync.RWMutex // all receivers should be pointers } func NewEventsHandler() events.Handler { - return &EventHandler{listeners: make([]events.EventListener, 0, 2)} + return &HandlerImpl{listeners: make([]events.Listener, 0, 2)} } // NumListeners returns number of event listeners. -func (eb *EventHandler) NumListeners() int { +func (eb *HandlerImpl) NumListeners() int { + eb.Lock() + defer eb.Unlock() return len(eb.listeners) } // AddListener registers new event listener. -func (eb *EventHandler) AddListener(listener events.EventListener) { +func (eb *HandlerImpl) AddListener(listener events.Listener) { eb.Lock() defer eb.Unlock() eb.listeners = append(eb.listeners, listener) } // Push broadcast events across all event listeners. -func (eb *EventHandler) Push(e interface{}) { - eb.Lock() - defer eb.Unlock() +func (eb *HandlerImpl) Push(e interface{}) { + // ReadLock here because we are not changing listeners + eb.RLock() + defer eb.RUnlock() for k := range eb.listeners { eb.listeners[k](e) } diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index ecb3fa71..c36c13e2 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -30,7 +30,7 @@ type SpawnResult struct { // SpawnWorker creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { c := make(chan SpawnResult) const op = errors.Op("spawn worker with context") go func() { @@ -113,7 +113,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { const op = errors.Op("spawn worker") w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 23bb2d5f..808e7d35 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -40,7 +40,7 @@ type StaticPool struct { events events.Handler // saved list of event listeners - listeners []events.EventListener + listeners []events.Listener // manages worker states and TTLs ww worker.Watcher @@ -104,7 +104,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co return p, nil } -func AddListeners(listeners ...events.EventListener) Options { +func AddListeners(listeners ...events.Listener) Options { return func(p *StaticPool) { p.listeners = listeners for i := 0; i < len(listeners); i++ { @@ -114,7 +114,7 @@ func AddListeners(listeners ...events.EventListener) Options { } // AddListener connects event listener to the pool. -func (sp *StaticPool) addListener(listener events.EventListener) { +func (sp *StaticPool) addListener(listener events.Listener) { sp.events.AddListener(listener) } diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index 38b3e7c9..ff882389 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -85,7 +85,7 @@ type socketSpawn struct { } // SpawnWorker creates Process and connects it to appropriate relay or returns error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { const op = errors.Op("spawn_worker_with_context") c := make(chan socketSpawn) go func() { @@ -145,7 +145,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { const op = errors.Op("spawn_worker") w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index db182a3e..493882a8 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -117,7 +117,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, erro return w, nil } -func AddListeners(listeners ...events.EventListener) Options { +func AddListeners(listeners ...events.Listener) Options { return func(p *Process) { for i := 0; i < len(listeners); i++ { p.addListener(listeners[i]) @@ -136,7 +136,7 @@ func (w *Process) Created() time.Time { } // AddListener registers new worker event listener. -func (w *Process) addListener(listener events.EventListener) { +func (w *Process) addListener(listener events.Listener) { w.events.AddListener(listener) } diff --git a/plugins/http/handler.go b/plugins/http/handler.go index 1889ed6d..15954f96 100644 --- a/plugins/http/handler.go +++ b/plugins/http/handler.go @@ -26,7 +26,7 @@ const ( const MB = 1024 * 1024 type Handle interface { - AddListener(l events.EventListener) + AddListener(l events.Listener) ServeHTTP(w http.ResponseWriter, r *http.Request) } @@ -75,7 +75,7 @@ type handler struct { log logger.Logger pool pool.Pool mul sync.Mutex - lsn events.EventListener + lsn events.Listener } func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) { @@ -91,7 +91,7 @@ func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool po } // Listen attaches handler event controller. -func (h *handler) AddListener(l events.EventListener) { +func (h *handler) AddListener(l events.Listener) { h.mul.Lock() defer h.mul.Unlock() diff --git a/plugins/server/interface.go b/plugins/server/interface.go index 9c1079ea..98345694 100644 --- a/plugins/server/interface.go +++ b/plugins/server/interface.go @@ -15,6 +15,6 @@ type Env map[string]string // Server creates workers for the application. type Server interface { CmdFactory(env Env) (func() *exec.Cmd, error) - NewWorker(ctx context.Context, env Env, listeners ...events.EventListener) (worker.BaseProcess, error) - NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.EventListener) (pool.Pool, error) + NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) + NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error) } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index b280d253..5b023bc6 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -104,10 +104,10 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { } // NewWorker issues new standalone worker. -func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.EventListener) (worker.BaseProcess, error) { +func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) { const op = errors.Op("new worker") - list := make([]events.EventListener, 0, len(listeners)) + list := make([]events.Listener, 0, len(listeners)) list = append(list, server.collectWorkerLogs) spawnCmd, err := server.CmdFactory(env) @@ -124,14 +124,14 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event } // NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.EventListener) (pool.Pool, error) { +func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { const op = errors.Op("server plugins new worker pool") spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, errors.E(op, err) } - list := make([]events.EventListener, 0, len(listeners)) + list := make([]events.Listener, 0, len(listeners)) list = append(list, server.collectPoolLogs) p, err := poolImpl.Initialize(ctx, spawnCmd, server.factory, opt, poolImpl.AddListeners(list...)) |