diff options
-rw-r--r-- | .github/workflows/build.yml | 12 | ||||
-rwxr-xr-x | Makefile | 6 | ||||
-rw-r--r-- | interfaces/events/handler.go | 10 | ||||
-rw-r--r-- | interfaces/events/pool_events.go | 65 | ||||
-rw-r--r-- | interfaces/events/worker_events.go | 34 | ||||
-rwxr-xr-x | interfaces/factory/factory.go | 2 | ||||
-rw-r--r-- | interfaces/pool/pool.go | 102 | ||||
-rw-r--r-- | interfaces/worker/factory.go | 18 | ||||
-rw-r--r-- | interfaces/worker/watcher.go | 26 | ||||
-rw-r--r-- | interfaces/worker/worker.go | 62 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 4 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 19 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 11 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 19 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 3 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 9 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 18 | ||||
-rw-r--r-- | plugins/http/handler.go | 8 | ||||
-rw-r--r-- | plugins/http/plugin.go | 11 | ||||
-rw-r--r-- | plugins/http/tests/http_test.go | 4 | ||||
-rw-r--r-- | plugins/server/plugin.go | 11 | ||||
-rwxr-xr-x | util/events.go | 12 |
22 files changed, 401 insertions, 65 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f69d672a..204173cd 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -65,7 +65,11 @@ jobs: - name: Run golang tests on Windows without codecov if: ${{ matrix.os == 'windows-latest' }} run: | - go test -v -race -cover -tags=debug . + go test -v -race -cover -tags=debug ./pkg/pipe + go test -v -race -cover -tags=debug ./pkg/pool + go test -v -race -cover -tags=debug ./pkg/socket + go test -v -race -cover -tags=debug ./pkg/worker + go test -v -race -cover -tags=debug ./pkg/worker_watcher go test -v -race -cover -tags=debug ./plugins/rpc go test -v -race -cover -tags=debug ./plugins/rpc/tests go test -v -race -cover -tags=debug ./plugins/config/tests @@ -86,7 +90,11 @@ jobs: if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} run: | mkdir ./coverage-ci - go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/lib.txt -covermode=atomic . + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/socket + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/worker_watcher.txt -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/rpc_config.txt -covermode=atomic ./plugins/rpc go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./plugins/rpc/tests go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/plugin_config.txt -covermode=atomic ./plugins/config/tests @@ -24,7 +24,11 @@ uninstall: ## Uninstall locally installed RR rm -f /usr/local/bin/rr test: ## Run application tests - go test -v -race -cover -tags=debug -covermode=atomic . + go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pipe + go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pool + go test -v -race -cover -tags=debug -covermode=atomic ./pkg/socket + go test -v -race -cover -tags=debug -covermode=atomic ./pkg/worker + go test -v -race -cover -tags=debug -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc/tests go test -v -race -cover -tags=debug -covermode=atomic ./plugins/config/tests diff --git a/interfaces/events/handler.go b/interfaces/events/handler.go new file mode 100644 index 00000000..01f64d73 --- /dev/null +++ b/interfaces/events/handler.go @@ -0,0 +1,10 @@ +package events + +type Handler interface { + NumListeners() int + AddListener(listener EventListener) + Push(e interface{}) +} + +// Event listener listens for the events produced by worker, worker pool or other service. +type EventListener func(event interface{}) diff --git a/interfaces/events/pool_events.go b/interfaces/events/pool_events.go new file mode 100644 index 00000000..cc32f6b2 --- /dev/null +++ b/interfaces/events/pool_events.go @@ -0,0 +1,65 @@ +package events + +const ( + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct P = iota + 7800 + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventPoolError caused on pool wide errors. + EventPoolError + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // todo: EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError + EventTTL + + // todo: EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL +) + +type P int64 + +func (ev P) String() string { + switch ev { + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventPoolError: + return "EventPoolError" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + } + return "Unknown event type" +} + +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event P + + // Payload depends on event type, typically it's worker or error. + Payload interface{} +} diff --git a/interfaces/events/worker_events.go b/interfaces/events/worker_events.go new file mode 100644 index 00000000..497f0a06 --- /dev/null +++ b/interfaces/events/worker_events.go @@ -0,0 +1,34 @@ +package events + +// EventWorkerKill thrown after WorkerProcess is being forcefully killed. +const ( + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError E = iota + 200 + + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog +) + +type E int64 + +func (ev E) String() string { + switch ev { + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + } + return "Unknown event type" +} + +// WorkerEvent wraps worker events. +type WorkerEvent struct { + // Event id, see below. + Event E + + // Worker triggered the event. + Worker interface{} + + // Event specific payload. + Payload interface{} +} diff --git a/interfaces/factory/factory.go b/interfaces/factory/factory.go index 036ff4e7..51b73501 100755 --- a/interfaces/factory/factory.go +++ b/interfaces/factory/factory.go @@ -4,7 +4,7 @@ import ( "context" "os/exec" - "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/interfaces/worker" ) // Factory is responsible of wrapping given command into tasks WorkerProcess. diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go index 4eadf064..a1015fd6 100644 --- a/interfaces/pool/pool.go +++ b/interfaces/pool/pool.go @@ -1 +1,103 @@ package pool + +import ( + "context" + "runtime" + "time" + + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" +) + +// Pool managed set of inner worker processes. +type Pool interface { + // AddListener connects event listener to the pool. + AddListener(listener events.EventListener) + + // GetConfig returns pool configuration. + GetConfig() interface{} + + // Exec + Exec(rqs internal.Payload) (internal.Payload, error) + + ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) + + // Workers returns worker list associated with the pool. + Workers() (workers []worker.BaseProcess) + + // Remove worker from the pool. + RemoveWorker(worker worker.BaseProcess) error + + // Destroy all underlying stack (but let them to complete the task). + Destroy(ctx context.Context) +} + +// Configures the pool behaviour. +type Config struct { + // Debug flag creates new fresh worker before every request. + Debug bool + + // NumWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. + NumWorkers int64 + + // MaxJobs defines how many executions is allowed for the worker until + // it's destruction. set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. + MaxJobs int64 + + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. Defaults to 60s. + AllocateTimeout time.Duration + + // DestroyTimeout defines for how long pool should be waiting for worker to + // properly destroy, if timeout reached worker will be killed. Defaults to 60s. + DestroyTimeout time.Duration + + // Supervision config to limit worker and pool memory usage. + Supervisor *SupervisorConfig +} + +// InitDefaults enables default config values. +func (cfg *Config) InitDefaults() { + if cfg.NumWorkers == 0 { + cfg.NumWorkers = int64(runtime.NumCPU()) + } + + if cfg.AllocateTimeout == 0 { + cfg.AllocateTimeout = time.Minute + } + + if cfg.DestroyTimeout == 0 { + cfg.DestroyTimeout = time.Minute + } + if cfg.Supervisor == nil { + return + } + cfg.Supervisor.InitDefaults() +} + +type SupervisorConfig struct { + // WatchTick defines how often to check the state of worker. + WatchTick uint64 + + // TTL defines maximum time worker is allowed to live. + TTL uint64 + + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. + IdleTTL uint64 + + // ExecTTL defines maximum lifetime per job. + ExecTTL uint64 + + // MaxWorkerMemory limits memory per worker. + MaxWorkerMemory uint64 +} + +// InitDefaults enables default config values. +func (cfg *SupervisorConfig) InitDefaults() { + if cfg.WatchTick == 0 { + cfg.WatchTick = 1 + } +} diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go new file mode 100644 index 00000000..19e2bf5d --- /dev/null +++ b/interfaces/worker/factory.go @@ -0,0 +1,18 @@ +package worker + +import ( + "context" + "os/exec" +) + +// Factory is responsible of wrapping given command into tasks WorkerProcess. +type Factory interface { + // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. + // Process must not be started. + SpawnWorkerWithContext(context.Context, *exec.Cmd) (BaseProcess, error) + // SpawnWorker creates new WorkerProcess process based on given command. + // Process must not be started. + SpawnWorker(*exec.Cmd) (BaseProcess, error) + // Close the factory and underlying connections. + Close(ctx context.Context) error +} diff --git a/interfaces/worker/watcher.go b/interfaces/worker/watcher.go new file mode 100644 index 00000000..ce2c1c5a --- /dev/null +++ b/interfaces/worker/watcher.go @@ -0,0 +1,26 @@ +package worker + +import "context" + +type Watcher interface { + // AddToWatch used to add stack to wait its state + AddToWatch(workers []BaseProcess) error + + // GetFreeWorker provide first free worker + GetFreeWorker(ctx context.Context) (BaseProcess, error) + + // PutWorker enqueues worker back + PushWorker(w BaseProcess) + + // AllocateNew used to allocate new worker and put in into the WorkerWatcher + AllocateNew() error + + // Destroy destroys the underlying stack + Destroy(ctx context.Context) + + // WorkersList return all stack w/o removing it from internal storage + WorkersList() []BaseProcess + + // RemoveWorker remove worker from the stack + RemoveWorker(wb BaseProcess) error +} diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go new file mode 100644 index 00000000..edbc68d9 --- /dev/null +++ b/interfaces/worker/worker.go @@ -0,0 +1,62 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/spiral/goridge/v3" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/internal" +) + +// Allocator is responsible for worker allocation in the pool +type Allocator func() (BaseProcess, error) + +type BaseProcess interface { + fmt.Stringer + + // Pid returns worker pid. + Pid() int64 + + // Created returns time worker was created at. + Created() time.Time + + // AddListener attaches listener to consume worker events. + AddListener(listener events.EventListener) + + // State return receive-only WorkerProcess state object, state can be used to safely access + // WorkerProcess status, time when status changed and number of WorkerProcess executions. + State() internal.State + + // Start used to run Cmd and immediately return + Start() error + + // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is + // complete and will return process error (if any), if stderr is presented it's value + // will be wrapped as WorkerError. Method will return error code if php process fails + // to find or Start the script. + Wait() error + + // Stop sends soft termination command to the WorkerProcess and waits for process completion. + Stop(ctx context.Context) error + + // Kill kills underlying process, make sure to call Wait() func to gather + // error log from the stderr. Does not waits for process completion! + Kill() error + + // Relay returns attached to worker goridge relay + Relay() goridge.Relay + + // AttachRelay used to attach goridge relay to the worker process + AttachRelay(rl goridge.Relay) +} + +type SyncWorker interface { + // BaseProcess provides basic functionality for the SyncWorker + BaseProcess + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS + Exec(rqs internal.Payload) (internal.Payload, error) + // ExecWithContext used to handle Exec with TTL + ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) +} diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 8250d226..99212ff8 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" @@ -413,7 +413,7 @@ func Test_Broken(t *testing.T) { data := "" mu := &sync.Mutex{} w.AddListener(func(event interface{}) { - if wev, ok := event.(worker.Event); ok { + if wev, ok := event.(events.WorkerEvent); ok { mu.Lock() data = string(wev.Payload.([]byte)) mu.Unlock() diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 220ea8e9..691290b2 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -5,6 +5,7 @@ import ( "os/exec" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" @@ -40,7 +41,7 @@ type StaticPool struct { factory worker.Factory // distributes the events - events worker.EventsHandler + events events.Handler // manages worker states and TTLs ww worker.Watcher @@ -120,7 +121,7 @@ func ExecAfter(after ...After) Options { } // AddListener connects event listener to the pool. -func (sp *StaticPool) AddListener(listener worker.EventListener) { +func (sp *StaticPool) AddListener(listener events.EventListener) { sp.events.AddListener(listener) } @@ -169,7 +170,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { sw.State().Set(internal.StateInvalid) err = sw.Stop(bCtx) if err != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } return sp.Exec(p) @@ -221,7 +222,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) sw.State().Set(internal.StateInvalid) err = sw.Stop(bCtx) if err != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } return sp.Exec(rqs) @@ -252,7 +253,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Push(pool.Event{Event: pool.EventNoFreeWorkers, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)}) return nil, errors.E(op, err) } // else if err not nil - return error @@ -274,13 +275,13 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - sp.events.Push(pool.Event{Event: pool.EventWorkerConstruct, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) } w.State().Set(internal.StateInvalid) err = w.Stop(bCtx) if err != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } else { sp.ww.PushWorker(w) @@ -290,7 +291,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } w.State().Set(internal.StateInvalid) - sp.events.Push(pool.Event{Event: pool.EventWorkerDestruct, Payload: w}) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) if errS != nil { @@ -325,7 +326,7 @@ func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) { r, err := sw.(worker.SyncWorker).Exec(p) if stopErr := sw.Stop(context.Background()); stopErr != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) } return r, err diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 8b13c7c9..0794b8e6 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -12,8 +12,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/pipe" "github.com/stretchr/testify/assert" @@ -179,8 +178,8 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { block := make(chan struct{}) p.AddListener(func(event interface{}) { - if wev, ok := event.(worker.Event); ok { - if wev.Event == worker.EventWorkerLog { + if wev, ok := event.(events.WorkerEvent); ok { + if wev.Event == events.EventWorkerLog { e := string(wev.Payload.([]byte)) if strings.ContainsAny(e, "undefined_function()") { block <- struct{}{} @@ -227,8 +226,8 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) p.AddListener(func(event interface{}) { - if pe, ok := event.(pool.Event); ok { - if pe.Event == pool.EventWorkerConstruct { + if pe, ok := event.(events.PoolEvent); ok { + if pe.Event == events.EventWorkerConstruct { wg.Done() } } diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 0a2d16f7..6d1f0c58 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -7,6 +7,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" @@ -22,13 +23,13 @@ type Supervised interface { type supervised struct { cfg *SupervisorConfig - events worker.EventsHandler + events events.Handler pool pool.Pool stopCh chan struct{} mu *sync.RWMutex } -func newPoolWatcher(pool pool.Pool, events worker.EventsHandler, cfg *SupervisorConfig) Supervised { +func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised { sp := &supervised{ cfg: cfg, events: events, @@ -91,7 +92,7 @@ func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) { return rsp, nil } -func (sp *supervised) AddListener(listener worker.EventListener) { +func (sp *supervised) AddListener(listener events.EventListener) { sp.pool.AddListener(listener) } @@ -156,20 +157,20 @@ func (sp *supervised) control() { if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { err = sp.pool.RemoveWorker(workers[i]) if err != nil { - sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) return } - sp.events.Push(pool.Event{Event: pool.EventTTL, Payload: workers[i]}) + sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { err = sp.pool.RemoveWorker(workers[i]) if err != nil { - sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) return } - sp.events.Push(pool.Event{Event: pool.EventMaxMemory, Payload: workers[i]}) + sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue } @@ -197,10 +198,10 @@ func (sp *supervised) control() { if sp.cfg.IdleTTL-uint64(res) <= 0 { err = sp.pool.RemoveWorker(workers[i]) if err != nil { - sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) return } - sp.events.Push(pool.Event{Event: pool.EventIdleTTL, Payload: workers[i]}) + sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } } } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 0fcde2c3..1eb1396e 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -6,6 +6,7 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "go.uber.org/multierr" @@ -193,7 +194,7 @@ func (tw *syncWorker) Created() time.Time { return tw.w.Created() } -func (tw *syncWorker) AddListener(listener worker.EventListener) { +func (tw *syncWorker) AddListener(listener events.EventListener) { tw.w.AddListener(listener) } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index d2b4374b..998ed592 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -13,6 +13,7 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/util" @@ -43,7 +44,7 @@ type Process struct { created time.Time // updates parent supervisor or pool about Process events - events worker.EventsHandler + events events.Handler // state holds information about current Process state, // number of Process executions, buf status change time. @@ -119,7 +120,7 @@ func (w *Process) Created() time.Time { } // AddListener registers new worker event listener. -func (w *Process) AddListener(listener worker.EventListener) { +func (w *Process) AddListener(listener events.EventListener) { w.events.AddListener(listener) } @@ -279,7 +280,7 @@ func (w *Process) watch() { buf := w.get() // read the last data n, _ := w.rd.Read(*buf) - w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) w.mu.Lock() // write new message w.stderr.Write((*buf)[:n]) @@ -290,7 +291,7 @@ func (w *Process) watch() { // read the max 10kb of stderr per one read buf := w.get() n, _ := w.rd.Read(*buf) - w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) w.mu.Lock() // write new message w.stderr.Write((*buf)[:n]) diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 530ce5d6..8a71ff8a 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -7,7 +7,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" @@ -140,7 +140,7 @@ func (stack *Stack) Destroy(ctx context.Context) { } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events worker.EventsHandler) worker.Watcher { +func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { ww := &workerWatcher{ stack: NewWorkersStack(), allocator: allocator, @@ -158,7 +158,7 @@ type workerWatcher struct { allocator worker.Allocator initialNumWorkers int64 actualNumWorkers int64 - events worker.EventsHandler + events events.Handler } func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { @@ -229,8 +229,8 @@ func (ww *workerWatcher) AllocateNew() error { ww.stack.mutex.Unlock() ww.PushWorker(sw) - ww.events.Push(pool.Event{ - Event: pool.EventWorkerConstruct, + ww.events.Push(events.PoolEvent{ + Event: events.EventWorkerConstruct, Payload: sw, }) @@ -279,8 +279,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("process wait") err := w.Wait() if err != nil { - ww.events.Push(worker.Event{ - Event: worker.EventWorkerError, + ww.events.Push(events.WorkerEvent{ + Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err), }) @@ -294,8 +294,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { _ = ww.stack.FindAndRemoveByPid(w.Pid()) err = ww.AllocateNew() if err != nil { - ww.events.Push(pool.Event{ - Event: pool.EventPoolError, + ww.events.Push(events.PoolEvent{ + Event: events.EventPoolError, Payload: errors.E(op, err), }) } diff --git a/plugins/http/handler.go b/plugins/http/handler.go index 4cc08c41..57590bfd 100644 --- a/plugins/http/handler.go +++ b/plugins/http/handler.go @@ -10,9 +10,9 @@ import ( "github.com/hashicorp/go-multierror" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" ) const ( @@ -26,7 +26,7 @@ const ( const MB = 1024 * 1024 type Handle interface { - AddListener(l worker.EventListener) + AddListener(l events.EventListener) ServeHTTP(w http.ResponseWriter, r *http.Request) } @@ -75,7 +75,7 @@ type handler struct { log log.Logger pool pool.Pool mul sync.Mutex - lsn worker.EventListener + lsn events.EventListener } func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) { @@ -91,7 +91,7 @@ func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool po } // Listen attaches handler event controller. -func (h *handler) AddListener(l worker.EventListener) { +func (h *handler) AddListener(l events.EventListener) { h.mul.Lock() defer h.mul.Unlock() diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 9cb01d4b..460263f6 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/spiral/endure" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" @@ -55,8 +56,8 @@ type Plugin struct { cfg *Config // middlewares to chain mdwr middleware - // Event listener to stdout - listener worker.EventListener + // WorkerEvent listener to stdout + listener events.EventListener // Pool which attached to all servers pool pool.Pool @@ -71,7 +72,7 @@ type Plugin struct { } // AddListener attaches server event controller. -func (s *Plugin) AddListener(listener worker.EventListener) { +func (s *Plugin) AddListener(listener events.EventListener) { // save listeners for Reset s.listener = listener s.pool.AddListener(listener) @@ -124,8 +125,8 @@ func (s *Plugin) logCallback(event interface{}) { s.log.Debug("http handler response received", "elapsed", ev.Elapsed().String(), "remote address", ev.Request.RemoteAddr) case ErrorEvent: s.log.Error("error event received", "elapsed", ev.Elapsed().String(), "error", ev.Error) - case worker.Event: - s.log.Debug("worker event received", "event", ev.Event, "worker state", ev.Worker.State()) + case events.WorkerEvent: + s.log.Debug("worker event received", "event", ev.Event, "worker state", ev.Worker.(worker.BaseProcess).State()) default: fmt.Println(event) } diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go index 2979949d..8b15cf0c 100644 --- a/plugins/http/tests/http_test.go +++ b/plugins/http/tests/http_test.go @@ -19,7 +19,7 @@ import ( "github.com/spiral/endure" "github.com/spiral/goridge/v3" "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/mocks" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -901,7 +901,7 @@ func TestHttpEchoErr(t *testing.T) { mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1") mockLogger.EXPECT().Debug("WORLD", "pid", gomock.Any()) - mockLogger.EXPECT().Debug("worker event received", "event", worker.EventWorkerLog, "worker state", gomock.Any()) + mockLogger.EXPECT().Debug("worker event received", "event", events.EventWorkerLog, "worker state", gomock.Any()) err = cont.RegisterAll( cfg, diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 7c91bbcc..e6003fbc 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" @@ -169,12 +170,12 @@ func (server *Plugin) setEnv(e server.Env) []string { } func (server *Plugin) collectLogs(event interface{}) { - if we, ok := event.(worker.Event); ok { + if we, ok := event.(events.WorkerEvent); ok { switch we.Event { - case worker.EventWorkerError: - server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) - case worker.EventWorkerLog: - server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) + case events.EventWorkerError: + server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid()) + case events.EventWorkerLog: + server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid()) } } } diff --git a/util/events.go b/util/events.go index 8aa5a1c6..259c7ddb 100755 --- a/util/events.go +++ b/util/events.go @@ -1,14 +1,16 @@ package util -import "github.com/spiral/roadrunner/v2/interfaces/worker" +import ( + "github.com/spiral/roadrunner/v2/interfaces/events" +) // EventHandler helps to broadcast events to multiple listeners. type EventHandler struct { - listeners []worker.EventListener + listeners []events.EventListener } -func NewEventsHandler() worker.EventsHandler { - return &EventHandler{listeners: make([]worker.EventListener, 0, 2)} +func NewEventsHandler() events.Handler { + return &EventHandler{listeners: make([]events.EventListener, 0, 2)} } // NumListeners returns number of event listeners. @@ -17,7 +19,7 @@ func (eb *EventHandler) NumListeners() int { } // AddListener registers new event listener. -func (eb *EventHandler) AddListener(listener worker.EventListener) { +func (eb *EventHandler) AddListener(listener events.EventListener) { eb.listeners = append(eb.listeners, listener) } |