diff options
author | Valery Piashchynski <[email protected]> | 2021-01-23 23:38:10 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-23 23:38:10 +0300 |
commit | 7fb3cc3588cfde9260a6bb431330ce1e0a71f56d (patch) | |
tree | 3200cf2136f7413a7e1cfc6ecdaa83716f9655f9 /pkg | |
parent | ee5d34abde7f3931bf939498eb7a8cb170232f4f (diff) |
interfaces folder deprecated
Diffstat (limited to 'pkg')
27 files changed, 587 insertions, 425 deletions
diff --git a/pkg/doc/pool_workflow.drawio b/pkg/doc/pool_workflow.drawio new file mode 100644 index 00000000..fd78d5a5 --- /dev/null +++ b/pkg/doc/pool_workflow.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-01-23T19:14:50.556Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="3_IXXh0-4hROHlpEF-EL" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">jZJNT4QwEIZ/TY8mQCWy10XUxJiNclg9NnSWNimUdIuAv95iW6Ahm8iBzDzz0enbQThvxmdFOvYmKQiURHRE+BElySHOzH8GkwUZxhbUilOL4hWU/AccjBztOYVrkKilFJp3Iaxk20KlA0aUkkOYdpEiPLUjNexAWRGxp2dONfPTmW+NvACvmd6FGuLzHbgyQuWwQbhAOFdSams1Yw5ils9LY+uebkSX2RS0+j8FnylJH75O9cdrkZ3O6XF8Z5c71+WbiN7d2Q2rJy+Ckn1LYW4SIXwcGNdQdqSao4N5dcOYboTxYmO6dqA0jDfnjJfbm8UB2YBWk0nxBV4wtzQH5w7BC1jGNtrfO0bco9dL51UUYzhdvLvq/xfb7DEufgE=</diagram></mxfile>
\ No newline at end of file diff --git a/pkg/doc/workflow.drawio b/pkg/doc/workflow.drawio new file mode 100644 index 00000000..d32d7b2d --- /dev/null +++ b/pkg/doc/workflow.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-01-23T19:13:52.763Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="OK65Xj3LZBjWN0qK7Y8T" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">jZJNT4QwEIZ/TY8mQCWy10XUxJiNclg9NnSWNimUdIuAv95iW6Ahm8iBzDzz0enbQThvxmdFOvYmKQiURHRE+BElySHOzH8GkwUZxhbUilOL4hWU/AccjBztOYVrkKilFJp3Iaxk20KlA0aUkkOYdpEiPLUjNexAWRGxp2dONfPTmW+NvACvmd6FGuLzHbgyQuWwQbhAOFdSams1Yw5ils9LY+uebkSX2RS0+j8FnylJH75O9cdrkZ3O6XF8Z5c71+WbiN7d2Q2rJy+Ckn1LYW4SIXwcGNdQdqSao4N5dcOYboTxYmO6dqA0jDfnjJfbm8UB2YBWk0nxBV4wtzQH5w7BC1jGNtrfO0bco9dL51UUYzhdvLvq/xfb7DEufgE=</diagram></mxfile>
\ No newline at end of file diff --git a/pkg/events/events.go b/pkg/events/general.go index 226a0c91..a09a8759 100755 --- a/pkg/events/events.go +++ b/pkg/events/general.go @@ -2,18 +2,16 @@ package events import ( "sync" - - "github.com/spiral/roadrunner/v2/interfaces/events" ) // HandlerImpl helps to broadcast events to multiple listeners. type HandlerImpl struct { - listeners []events.Listener + listeners []Listener sync.RWMutex // all receivers should be pointers } -func NewEventsHandler() events.Handler { - return &HandlerImpl{listeners: make([]events.Listener, 0, 2)} +func NewEventsHandler() Handler { + return &HandlerImpl{listeners: make([]Listener, 0, 2)} } // NumListeners returns number of event listeners. @@ -24,7 +22,7 @@ func (eb *HandlerImpl) NumListeners() int { } // AddListener registers new event listener. -func (eb *HandlerImpl) AddListener(listener events.Listener) { +func (eb *HandlerImpl) AddListener(listener Listener) { eb.Lock() defer eb.Unlock() eb.listeners = append(eb.listeners, listener) diff --git a/pkg/events/interface.go b/pkg/events/interface.go new file mode 100644 index 00000000..ac6c15a4 --- /dev/null +++ b/pkg/events/interface.go @@ -0,0 +1,14 @@ +package events + +// Handler interface +type Handler interface { + // Return number of active listeners + NumListeners() int + // AddListener adds lister to the publisher + AddListener(listener Listener) + // Push pushes event to the listeners + Push(e interface{}) +} + +// Event listener listens for the events produced by worker, worker pool or other service. +type Listener func(event interface{}) diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go new file mode 100644 index 00000000..2cc76eee --- /dev/null +++ b/pkg/events/pool_events.go @@ -0,0 +1,66 @@ +package events + +// TODO event numbers +const ( + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct P = iota + 7800 + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventPoolError caused on pool wide errors. + EventPoolError + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL +) + +type P int64 + +func (ev P) String() string { + switch ev { + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventPoolError: + return "EventPoolError" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + } + return "Unknown event type" +} + +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event P + + // Payload depends on event type, typically it's worker or error. + Payload interface{} +} diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go new file mode 100644 index 00000000..2bff1811 --- /dev/null +++ b/pkg/events/worker_events.go @@ -0,0 +1,33 @@ +package events + +const ( + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError W = iota + 200 + + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog +) + +type W int64 + +func (ev W) String() string { + switch ev { + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + } + return "Unknown event type" +} + +// WorkerEvent wraps worker events. +type WorkerEvent struct { + // Event id, see below. + Event W + + // Worker triggered the event. + Worker interface{} + + // Event specific payload. + Payload interface{} +} diff --git a/pkg/pool/config.go b/pkg/pool/config.go index e3e2d3cd..cf4aaaee 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -52,19 +52,19 @@ func (cfg *Config) InitDefaults() { type SupervisorConfig struct { // WatchTick defines how often to check the state of worker. - WatchTick uint64 `mapstructure:"watch_tick"` + WatchTick uint64 // TTL defines maximum time worker is allowed to live. - TTL uint64 `mapstructure:"ttl"` + TTL uint64 // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL uint64 `mapstructure:"idle_ttl"` + IdleTTL uint64 // ExecTTL defines maximum lifetime per job. - ExecTTL uint64 `mapstructure:"exec_ttl"` + ExecTTL uint64 // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"` + MaxWorkerMemory uint64 } // InitDefaults enables default config values. diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go new file mode 100644 index 00000000..f3fe4065 --- /dev/null +++ b/pkg/pool/interface.go @@ -0,0 +1,29 @@ +package pool + +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +// Pool managed set of inner worker processes. +type Pool interface { + // GetConfig returns pool configuration. + GetConfig() interface{} + + // Exec executes task with payload + Exec(rqs payload.Payload) (payload.Payload, error) + + // ExecWithContext executes task with context which is used with timeout + ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) + + // Workers returns worker list associated with the pool. + Workers() (workers []*worker.SyncWorkerImpl) + + // Remove worker from the pool. + RemoveWorker(worker worker.SyncWorker) error + + // Destroy all underlying stack (but let them to complete the task). + Destroy(ctx context.Context) +} diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index d1b726c1..bb416b29 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -6,13 +6,11 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/transport" + "github.com/spiral/roadrunner/v2/pkg/worker" workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" ) @@ -20,7 +18,7 @@ import ( const StopRequest = "{\"stop\":true}" // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) +type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error) type Options func(p *StaticPool) @@ -34,7 +32,7 @@ type StaticPool struct { cmd Command // creates and connects to stack - factory worker.Factory + factory transport.Factory // distributes the events events events.Handler @@ -43,7 +41,7 @@ type StaticPool struct { listeners []events.Listener // manages worker states and TTLs - ww worker.Watcher + ww workerWatcher.Watcher // allocate new worker allocator worker.Allocator @@ -53,7 +51,7 @@ type StaticPool struct { } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) { +func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) { const op = errors.Op("static_pool_initialize") if factory == nil { return nil, errors.E(op, errors.Str("no factory initialized")) @@ -69,7 +67,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co cfg: cfg, cmd: cmd, factory: factory, - events: eventsPkg.NewEventsHandler(), + events: events.NewEventsHandler(), } // add pool options @@ -78,7 +76,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co } p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) - p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) workers, err := p.allocateWorkers(p.cfg.NumWorkers) if err != nil { @@ -124,11 +122,11 @@ func (sp *StaticPool) GetConfig() interface{} { } // Workers returns worker list associated with the pool. -func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { +func (sp *StaticPool) Workers() (workers []*worker.SyncWorkerImpl) { return sp.ww.WorkersList() } -func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { +func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error { return sp.ww.RemoveWorker(wb) } @@ -153,12 +151,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { // worker want's to be terminated // TODO careful with string(rsp.Context) if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { - sp.stopWorker(&w) + sp.stopWorker(w) return sp.Exec(p) } - err = sp.checkMaxJobs(&w) + err = sp.checkMaxJobs(w) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -183,11 +181,11 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p // worker want's to be terminated if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { - sp.stopWorker(&w) + sp.stopWorker(w) return sp.ExecWithContext(ctx, p) } - err = sp.checkMaxJobs(&w) + err = sp.checkMaxJobs(w) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -195,30 +193,30 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p return rsp, nil } -func (sp *StaticPool) stopWorker(w *worker.SyncWorker) { +func (sp *StaticPool) stopWorker(w worker.SyncWorker) { const op = errors.Op("static_pool_stop_worker") - (*w).State().Set(internal.StateInvalid) - err := (*w).Stop() + w.State().Set(internal.StateInvalid) + err := w.Stop() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: *w, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } // checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs -func (sp *StaticPool) checkMaxJobs(w *worker.SyncWorker) error { +func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error { const op = errors.Op("static_pool_check_max_jobs") - if sp.cfg.MaxJobs != 0 && (*w).State().NumExecs() >= sp.cfg.MaxJobs { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err := sp.ww.AllocateNew() if err != nil { return errors.E(op, err) } } else { - sp.ww.PushWorker(*w) + sp.ww.PushWorker(w) } return nil } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (*worker.SyncWorkerImpl, error) { // GetFreeWorker function consumes context with timeout w, err := sp.ww.GetFreeWorker(ctxGetFree) if err != nil { @@ -230,7 +228,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke // else if err not nil - return error return nil, errors.E(op, err) } - return w.(worker.SyncWorker), nil + return w, nil } // Destroy all underlying stack (but let them to complete the task). @@ -239,7 +237,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (payload.Payload, error) { + return func(err error, w worker.SyncWorker) (payload.Payload, error) { const op = errors.Op("error encoder") // just push event if on any stage was timeout error if errors.Is(errors.ExecTTL, err) { @@ -277,8 +275,8 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } } -func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator { - return func() (worker.BaseProcess, error) { +func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { + return func() (*worker.SyncWorkerImpl, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...) @@ -286,10 +284,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return nil, err } - sw, err := syncWorker.From(w) - if err != nil { - return nil, err - } + sw := worker.From(w) sp.events.Push(events.PoolEvent{ Event: events.EventWorkerConstruct, @@ -305,7 +300,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, err } - r, err := sw.(worker.SyncWorker).Exec(p) + r, err := sw.Exec(p) if stopErr := sw.Stop(); stopErr != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) @@ -315,9 +310,9 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) { +func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.SyncWorker, error) { const op = errors.Op("allocate workers") - var workers []worker.BaseProcess + var workers []worker.SyncWorker // constant number of stack simplify logic for i := int64(0); i < numWorkers; i++ { @@ -326,11 +321,7 @@ func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, e return nil, errors.E(op, errors.WorkerAllocate, err) } - sw, err := syncWorker.From(w) - if err != nil { - return nil, errors.E(op, err) - } - workers = append(workers, sw) + workers = append(workers, w) } return workers, nil } diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 348f5297..a877b28f 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -12,10 +12,10 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/pipe" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" "github.com/stretchr/testify/assert" ) diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 19cda759..2bae8f9e 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -6,11 +6,10 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/tools" ) @@ -20,7 +19,7 @@ const MB = 1024 * 1024 const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck type Supervised interface { - pool.Pool + Pool // Start used to start watching process for all pool workers Start() } @@ -28,12 +27,12 @@ type Supervised interface { type supervised struct { cfg *SupervisorConfig events events.Handler - pool pool.Pool + pool Pool stopCh chan struct{} mu *sync.RWMutex } -func supervisorWrapper(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised { +func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { sp := &supervised{ cfg: cfg, events: events, @@ -101,13 +100,13 @@ func (sp *supervised) GetConfig() interface{} { return sp.pool.GetConfig() } -func (sp *supervised) Workers() (workers []worker.BaseProcess) { +func (sp *supervised) Workers() (workers []*worker.SyncWorkerImpl) { sp.mu.Lock() defer sp.mu.Unlock() return sp.pool.Workers() } -func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { +func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error { return sp.pool.RemoveWorker(worker) } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index a9424cd5..58f63b7e 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/pipe" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" "github.com/spiral/roadrunner/v2/tools" "github.com/stretchr/testify/assert" ) diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go new file mode 100644 index 00000000..299ac95f --- /dev/null +++ b/pkg/transport/interface.go @@ -0,0 +1,21 @@ +package transport + +import ( + "context" + "os/exec" + + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +// Factory is responsible of wrapping given command into tasks WorkerProcess. +type Factory interface { + // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. + // Process must not be started. + SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) + // SpawnWorker creates new WorkerProcess process based on given command. + // Process must not be started. + SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error) + // Close the factory and underlying connections. + Close() error +} diff --git a/pkg/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go index b656eff8..dd7c5841 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/transport/pipe/pipe_factory.go @@ -6,10 +6,9 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" ) @@ -19,22 +18,22 @@ type Factory struct{} // NewPipeFactory returns new factory instance and starts // listening -func NewPipeFactory() worker.Factory { +func NewPipeFactory() *Factory { return &Factory{} } type SpawnResult struct { - w worker.BaseProcess + w *worker.Process err error } // SpawnWorker creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { c := make(chan SpawnResult) const op = errors.Op("factory_spawn_worker_with_timeout") go func() { - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { c <- SpawnResult{ w: nil, @@ -113,9 +112,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { const op = errors.Op("factory_spawn_worker") - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { return nil, errors.E(op, err) } diff --git a/pkg/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index 805a24ee..2e5bbcd5 100644 --- a/pkg/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -8,15 +8,15 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) func Test_GetState2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -28,14 +28,11 @@ func Test_GetState2(t *testing.T) { assert.NotNil(t, w) assert.Equal(t, internal.StateReady, w.State().Value()) - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } + assert.NoError(t, w.Stop()) } func Test_Kill2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) wg := &sync.WaitGroup{} @@ -58,7 +55,7 @@ func Test_Kill2(t *testing.T) { } func Test_Pipe_Start2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) assert.NoError(t, err) @@ -72,7 +69,7 @@ func Test_Pipe_Start2(t *testing.T) { } func Test_Pipe_StartError2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") err := cmd.Start() if err != nil { t.Errorf("error running the command: error %v", err) @@ -84,7 +81,7 @@ func Test_Pipe_StartError2(t *testing.T) { } func Test_Pipe_PipeError3(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) @@ -96,7 +93,7 @@ func Test_Pipe_PipeError3(t *testing.T) { } func Test_Pipe_PipeError4(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) @@ -108,7 +105,7 @@ func Test_Pipe_PipeError4(t *testing.T) { } func Test_Pipe_Failboot2(t *testing.T) { - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") w, err := NewPipeFactory().SpawnWorker(cmd) assert.Nil(t, w) @@ -117,14 +114,14 @@ func Test_Pipe_Failboot2(t *testing.T) { } func Test_Pipe_Invalid2(t *testing.T) { - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") w, err := NewPipeFactory().SpawnWorker(cmd) assert.Error(t, err) assert.Nil(t, w) } func Test_Pipe_Echo2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { t.Fatal(err) @@ -136,10 +133,7 @@ func Test_Pipe_Echo2(t *testing.T) { } }() - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -152,7 +146,7 @@ func Test_Pipe_Echo2(t *testing.T) { } func Test_Pipe_Broken2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { t.Fatal(err) @@ -163,10 +157,7 @@ func Test_Pipe_Broken2(t *testing.T) { assert.Error(t, err) }() - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -178,7 +169,7 @@ func Test_Pipe_Broken2(t *testing.T) { func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { f := NewPipeFactory() for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := f.SpawnWorker(cmd) go func() { if w.Wait() != nil { @@ -194,13 +185,11 @@ func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) + b.ReportAllocs() b.ResetTimer() go func() { @@ -224,7 +213,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { b.Fatal(err) @@ -237,10 +226,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { } }() - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -250,7 +236,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { b.Fatal(err) @@ -263,10 +249,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { } }() - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -276,28 +259,26 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { } func Test_Echo2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { t.Fatal(err) } - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) + go func() { - assert.NoError(t, syncWorker.Wait()) + assert.NoError(t, sw.Wait()) }() defer func() { - err := syncWorker.Stop() + err := sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } }() - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -308,26 +289,23 @@ func Test_Echo2(t *testing.T) { } func Test_BadPayload2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) go func() { - assert.NoError(t, syncWorker.Wait()) + assert.NoError(t, sw.Wait()) }() defer func() { - err := syncWorker.Stop() + err := sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } }() - res, err := syncWorker.Exec(payload.Payload{}) + res, err := sw.Exec(payload.Payload{}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -337,7 +315,7 @@ func Test_BadPayload2(t *testing.T) { } func Test_String2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -350,13 +328,13 @@ func Test_String2(t *testing.T) { } }() - assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") assert.Contains(t, w.String(), "ready") assert.Contains(t, w.String(), "numExecs: 0") } func Test_Echo_Slow2(t *testing.T) { - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -369,12 +347,9 @@ func Test_Echo_Slow2(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -385,7 +360,7 @@ func Test_Echo_Slow2(t *testing.T) { } func Test_Broken2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") data := "" mu := &sync.Mutex{} listener := func(event interface{}) { @@ -401,12 +376,9 @@ func Test_Broken2(t *testing.T) { t.Fatal(err) } - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -421,7 +393,7 @@ func Test_Broken2(t *testing.T) { } func Test_Error2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -435,12 +407,9 @@ func Test_Error2(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -452,7 +421,7 @@ func Test_Error2(t *testing.T) { } func Test_NumExecs2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -465,24 +434,21 @@ func Test_NumExecs2(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(1), w.State().NumExecs()) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(2), w.State().NumExecs()) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index a2731294..fa37ac0f 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -9,16 +9,16 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) func Test_GetState(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -38,7 +38,7 @@ func Test_GetState(t *testing.T) { func Test_Kill(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) wg := &sync.WaitGroup{} @@ -62,7 +62,7 @@ func Test_Kill(t *testing.T) { func Test_Pipe_Start(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) @@ -76,7 +76,7 @@ func Test_Pipe_Start(t *testing.T) { } func Test_Pipe_StartError(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") err := cmd.Start() if err != nil { t.Errorf("error running the command: error %v", err) @@ -89,7 +89,7 @@ func Test_Pipe_StartError(t *testing.T) { } func Test_Pipe_PipeError(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) @@ -102,7 +102,7 @@ func Test_Pipe_PipeError(t *testing.T) { } func Test_Pipe_PipeError2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) @@ -115,7 +115,7 @@ func Test_Pipe_PipeError2(t *testing.T) { } func Test_Pipe_Failboot(t *testing.T) { - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) @@ -125,7 +125,7 @@ func Test_Pipe_Failboot(t *testing.T) { } func Test_Pipe_Invalid(t *testing.T) { - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) @@ -133,7 +133,7 @@ func Test_Pipe_Invalid(t *testing.T) { } func Test_Pipe_Echo(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -146,10 +146,7 @@ func Test_Pipe_Echo(t *testing.T) { } }() - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -162,7 +159,7 @@ func Test_Pipe_Echo(t *testing.T) { } func Test_Pipe_Broken(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -174,10 +171,7 @@ func Test_Pipe_Broken(t *testing.T) { assert.Error(t, err) }() - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -189,7 +183,7 @@ func Test_Pipe_Broken(t *testing.T) { func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { f := NewPipeFactory() for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) go func() { if w.Wait() != nil { @@ -205,13 +199,11 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) + b.ReportAllocs() b.ResetTimer() go func() { @@ -235,7 +227,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -249,10 +241,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } }() - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -262,7 +251,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -276,10 +265,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { } }() - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -290,28 +276,25 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { func Test_Echo(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) go func() { - assert.NoError(t, syncWorker.Wait()) + assert.NoError(t, sw.Wait()) }() defer func() { - err := syncWorker.Stop() + err := sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } }() - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -323,26 +306,23 @@ func Test_Echo(t *testing.T) { func Test_BadPayload(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) go func() { - assert.NoError(t, syncWorker.Wait()) + assert.NoError(t, sw.Wait()) }() defer func() { - err := syncWorker.Stop() + err := sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } }() - res, err := syncWorker.Exec(payload.Payload{}) + res, err := sw.Exec(payload.Payload{}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -353,7 +333,7 @@ func Test_BadPayload(t *testing.T) { func Test_String(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -366,14 +346,14 @@ func Test_String(t *testing.T) { } }() - assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") assert.Contains(t, w.String(), "ready") assert.Contains(t, w.String(), "numExecs: 0") } func Test_Echo_Slow(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -386,12 +366,9 @@ func Test_Echo_Slow(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -403,7 +380,7 @@ func Test_Echo_Slow(t *testing.T) { func Test_Broken(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") data := "" mu := &sync.Mutex{} listener := func(event interface{}) { @@ -419,12 +396,9 @@ func Test_Broken(t *testing.T) { t.Fatal(err) } - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -440,7 +414,7 @@ func Test_Broken(t *testing.T) { func Test_Error(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -454,12 +428,9 @@ func Test_Error(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -472,7 +443,7 @@ func Test_Error(t *testing.T) { func Test_NumExecs(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -485,24 +456,21 @@ func Test_NumExecs(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(1), w.State().NumExecs()) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(2), w.State().NumExecs()) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/pkg/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index 8f99ff73..ccd2b0bf 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -11,10 +11,9 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/socket" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" "golang.org/x/sync/errgroup" @@ -38,7 +37,7 @@ type Factory struct { // NewSocketServer returns Factory attached to a given socket listener. // tout specifies for how long factory should serve for incoming relay connection -func NewSocketServer(ls net.Listener, tout time.Duration) worker.Factory { +func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { f := &Factory{ ls: ls, tout: tout, @@ -80,18 +79,18 @@ func (f *Factory) listen() error { } type socketSpawn struct { - w worker.BaseProcess + w *worker.Process err error } // SpawnWorker creates Process and connects it to appropriate relay or returns error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { const op = errors.Op("factory_spawn_worker_with_timeout") c := make(chan socketSpawn) go func() { ctx, cancel := context.WithTimeout(ctx, f.tout) defer cancel() - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { c <- socketSpawn{ w: nil, @@ -145,9 +144,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { const op = errors.Op("factory_spawn_worker") - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { return nil, err } diff --git a/pkg/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go index 2f21e408..0e29e7d2 100644 --- a/pkg/socket/socket_factory_spawn_test.go +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -26,7 +26,7 @@ func Test_Tcp_Start2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) assert.NoError(t, err) @@ -49,7 +49,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") f := NewSocketServer(ls, time.Minute) defer func() { @@ -82,7 +82,7 @@ func Test_Tcp_StartError2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") err = cmd.Start() if err != nil { t.Errorf("error executing the command: error %v", err) @@ -106,7 +106,7 @@ func Test_Tcp_Failboot2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) @@ -127,7 +127,7 @@ func Test_Tcp_Invalid2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd) assert.Error(t, err) @@ -147,7 +147,7 @@ func Test_Tcp_Broken2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { @@ -169,10 +169,7 @@ func Test_Tcp_Broken2(t *testing.T) { assert.Error(t, err2) }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) @@ -194,7 +191,7 @@ func Test_Tcp_Echo2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) go func() { @@ -207,10 +204,7 @@ func Test_Tcp_Echo2(t *testing.T) { } }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -230,7 +224,7 @@ func Test_Unix_Start2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) assert.NoError(t, err) @@ -254,7 +248,7 @@ func Test_Unix_Failboot2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) @@ -270,7 +264,7 @@ func Test_Unix_Timeout2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd) assert.Nil(t, w) @@ -286,7 +280,7 @@ func Test_Unix_Invalid2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd) assert.Error(t, err) @@ -301,7 +295,7 @@ func Test_Unix_Broken2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { @@ -322,10 +316,7 @@ func Test_Unix_Broken2(t *testing.T) { assert.Error(t, err) }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -343,7 +334,7 @@ func Test_Unix_Echo2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { @@ -359,10 +350,7 @@ func Test_Unix_Echo2(t *testing.T) { } }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -384,7 +372,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := f.SpawnWorker(cmd) if err != nil { @@ -409,7 +397,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { assert.NoError(b, err) }() - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { @@ -422,10 +410,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { } }() - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -452,7 +437,7 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := f.SpawnWorker(cmd) if err != nil { @@ -481,7 +466,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { b.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { @@ -494,10 +479,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { } }() - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { diff --git a/pkg/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index 983f3e8e..f55fc3dd 100755 --- a/pkg/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -29,7 +29,7 @@ func Test_Tcp_Start(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) @@ -54,7 +54,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") f := NewSocketServer(ls, time.Minute) defer func() { @@ -89,7 +89,7 @@ func Test_Tcp_StartError(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") err = cmd.Start() if err != nil { t.Errorf("error executing the command: error %v", err) @@ -116,7 +116,7 @@ func Test_Tcp_Failboot(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) @@ -139,7 +139,7 @@ func Test_Tcp_Timeout(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0") + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "tcp", "200", "0") w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) @@ -162,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) @@ -184,7 +184,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -206,10 +206,7 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, err2) }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) @@ -233,7 +230,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -246,10 +243,7 @@ func Test_Tcp_Echo(t *testing.T) { } }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -275,7 +269,7 @@ func Test_Unix_Start(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) @@ -305,7 +299,7 @@ func Test_Unix_Failboot(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) @@ -327,7 +321,7 @@ func Test_Unix_Timeout(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) @@ -349,7 +343,7 @@ func Test_Unix_Invalid(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) @@ -370,7 +364,7 @@ func Test_Unix_Broken(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -391,10 +385,7 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, err) }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -418,7 +409,7 @@ func Test_Unix_Echo(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -434,10 +425,7 @@ func Test_Unix_Echo(t *testing.T) { } }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -465,7 +453,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := f.SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -496,7 +484,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { b.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -509,10 +497,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { } }() - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -537,7 +522,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := f.SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -564,7 +549,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { b.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -577,10 +562,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { } }() - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go new file mode 100644 index 00000000..9d74ae10 --- /dev/null +++ b/pkg/worker/interface.go @@ -0,0 +1,56 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" +) + +type BaseProcess interface { + fmt.Stringer + + // Pid returns worker pid. + Pid() int64 + + // Created returns time worker was created at. + Created() time.Time + + // State return receive-only WorkerProcess state object, state can be used to safely access + // WorkerProcess status, time when status changed and number of WorkerProcess executions. + State() internal.State + + // Start used to run Cmd and immediately return + Start() error + + // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is + // complete and will return process error (if any), if stderr is presented it's value + // will be wrapped as WorkerError. Method will return error code if php process fails + // to find or Start the script. + Wait() error + + // Stop sends soft termination command to the WorkerProcess and waits for process completion. + Stop() error + + // Kill kills underlying process, make sure to call Wait() func to gather + // error log from the stderr. Does not waits for process completion! + Kill() error + + // Relay returns attached to worker goridge relay + Relay() relay.Relay + + // AttachRelay used to attach goridge relay to the worker process + AttachRelay(rl relay.Relay) +} + +type SyncWorker interface { + // BaseProcess provides basic functionality for the SyncWorker + BaseProcess + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS + Exec(rqs payload.Payload) (payload.Payload, error) + // ExecWithContext used to handle Exec with TTL + ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) +} diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 8314c039..1a0393fb 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -8,50 +8,67 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" ) -type syncWorker struct { - w worker.BaseProcess +// Allocator is responsible for worker allocation in the pool +type Allocator func() (*SyncWorkerImpl, error) + +type SyncWorkerImpl struct { + process *Process } // From creates SyncWorker from BaseProcess -func From(w worker.BaseProcess) (worker.SyncWorker, error) { - return &syncWorker{ - w: w, - }, nil +func From(process *Process) *SyncWorkerImpl { + return &SyncWorkerImpl{ + process: process, + } +} + +// FromSync creates BaseProcess from SyncWorkerImpl +func FromSync(w *SyncWorkerImpl) BaseProcess { + return &Process{ + created: w.process.created, + events: w.process.events, + state: w.process.state, + cmd: w.process.cmd, + pid: w.process.pid, + stderr: w.process.stderr, + endState: w.process.endState, + relay: w.process.relay, + rd: w.process.rd, + } } // Exec payload without TTL timeout. -func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec") if len(p.Body) == 0 && len(p.Context) == 0 { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.w.State().Value() != internal.StateReady { - return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) + if tw.process.State().Value() != internal.StateReady { + return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(internal.StateWorking) + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.w.State().Set(internal.StateErrored) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.w.State().Set(internal.StateReady) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() return rsp, nil } @@ -62,7 +79,7 @@ type wexec struct { } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) @@ -75,24 +92,24 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p return } - if tw.w.State().Value() != internal.StateReady { + if tw.process.State().Value() != internal.StateReady { c <- wexec{ payload: payload.Payload{}, - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())), + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), } return } // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(internal.StateWorking) + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.w.State().Set(internal.StateErrored) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() } c <- wexec{ payload: payload.Payload{}, @@ -101,8 +118,8 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p return } - tw.w.State().Set(internal.StateReady) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() c <- wexec{ payload: rsp, @@ -128,7 +145,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p } } -func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_payload") fr := frame.NewFrame() @@ -156,7 +173,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { frameR := frame.NewFrame() - err = tw.w.Relay().Receive(frameR) + err = tw.process.Relay().Receive(frameR) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -186,42 +203,42 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { return pl, nil } -func (tw *syncWorker) String() string { - return tw.w.String() +func (tw *SyncWorkerImpl) String() string { + return tw.process.String() } -func (tw *syncWorker) Pid() int64 { - return tw.w.Pid() +func (tw *SyncWorkerImpl) Pid() int64 { + return tw.process.Pid() } -func (tw *syncWorker) Created() time.Time { - return tw.w.Created() +func (tw *SyncWorkerImpl) Created() time.Time { + return tw.process.Created() } -func (tw *syncWorker) State() internal.State { - return tw.w.State() +func (tw *SyncWorkerImpl) State() internal.State { + return tw.process.State() } -func (tw *syncWorker) Start() error { - return tw.w.Start() +func (tw *SyncWorkerImpl) Start() error { + return tw.process.Start() } -func (tw *syncWorker) Wait() error { - return tw.w.Wait() +func (tw *SyncWorkerImpl) Wait() error { + return tw.process.Wait() } -func (tw *syncWorker) Stop() error { - return tw.w.Stop() +func (tw *SyncWorkerImpl) Stop() error { + return tw.process.Stop() } -func (tw *syncWorker) Kill() error { - return tw.w.Kill() +func (tw *SyncWorkerImpl) Kill() error { + return tw.process.Kill() } -func (tw *syncWorker) Relay() relay.Relay { - return tw.w.Relay() +func (tw *SyncWorkerImpl) Relay() relay.Relay { + return tw.process.Relay() } -func (tw *syncWorker) AttachRelay(rl relay.Relay) { - tw.w.AttachRelay(rl) +func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { + tw.process.AttachRelay(rl) } diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go index 40988b06..df556e93 100755 --- a/pkg/worker/sync_worker_test.go +++ b/pkg/worker/sync_worker_test.go @@ -22,12 +22,9 @@ func Test_NotStarted_Exec(t *testing.T) { w, _ := InitBaseWorker(cmd) - syncWorker, err := From(w) - if err != nil { - t.Fatal(err) - } + sw := From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index bf70d646..8fd71cca 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -13,10 +13,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/events" "go.uber.org/multierr" ) @@ -78,14 +76,14 @@ type Process struct { } // InitBaseWorker creates new Process over given exec.cmd. -func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, error) { +func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { const op = errors.Op("init_base_worker") if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } w := &Process{ created: time.Now(), - events: eventsPkg.NewEventsHandler(), + events: events.NewEventsHandler(), cmd: cmd, state: internal.NewWorkerState(internal.StateInactive), stderr: new(bytes.Buffer), @@ -198,7 +196,7 @@ func (w *Process) Wait() error { // at this point according to the documentation (see cmd.Wait comment) // if worker finishes with an error, message will be written to the stderr first - // and then w.cmd.Wait return an error + // and then process.cmd.Wait return an error w.endState = w.cmd.ProcessState if err != nil { w.state.Set(internal.StateErrored) diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go new file mode 100644 index 00000000..13991541 --- /dev/null +++ b/pkg/worker_watcher/interface.go @@ -0,0 +1,30 @@ +package worker_watcher //nolint:golint,stylecheck + +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Watcher interface { + // AddToWatch used to add stack to wait its state + AddToWatch(workers []worker.SyncWorker) error + + // GetFreeWorker provide first free worker + GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) + + // PutWorker enqueues worker back + PushWorker(w worker.SyncWorker) + + // AllocateNew used to allocate new worker and put in into the WorkerWatcher + AllocateNew() error + + // Destroy destroys the underlying stack + Destroy(ctx context.Context) + + // WorkersList return all stack w/o removing it from internal storage + WorkersList() []*worker.SyncWorkerImpl + + // RemoveWorker remove worker from the stack + RemoveWorker(wb worker.SyncWorker) error +} diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index c87e8b65..2d23d0e9 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -5,12 +5,12 @@ import ( "sync" "time" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" ) type Stack struct { - workers []worker.BaseProcess + workers []*worker.SyncWorkerImpl mutex sync.RWMutex destroy bool actualNumOfWorkers uint64 @@ -20,7 +20,7 @@ type Stack struct { func NewWorkersStack(initialNumOfWorkers uint64) *Stack { w := runtime.NumCPU() return &Stack{ - workers: make([]worker.BaseProcess, 0, w), + workers: make([]*worker.SyncWorkerImpl, 0, w), actualNumOfWorkers: 0, initialNumOfWorkers: initialNumOfWorkers, } @@ -39,7 +39,7 @@ func (stack *Stack) Push(w worker.BaseProcess) { stack.mutex.Lock() defer stack.mutex.Unlock() stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w) + stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl)) } func (stack *Stack) IsEmpty() bool { @@ -48,7 +48,7 @@ func (stack *Stack) IsEmpty() bool { return len(stack.workers) == 0 } -func (stack *Stack) Pop() (worker.BaseProcess, bool) { +func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { stack.mutex.Lock() defer stack.mutex.Unlock() @@ -85,13 +85,15 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { } // Workers return copy of the workers in the stack -func (stack *Stack) Workers() []worker.BaseProcess { +func (stack *Stack) Workers() []*worker.SyncWorkerImpl { stack.mutex.Lock() defer stack.mutex.Unlock() - workersCopy := make([]worker.BaseProcess, 0, 1) + workersCopy := make([]*worker.SyncWorkerImpl, 0, 1) // copy for _, v := range stack.workers { - workersCopy = append(workersCopy, v) + if v != nil { + workersCopy = append(workersCopy, v) + } } return workersCopy diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go index 86af2043..5287a6dc 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/stack_test.go @@ -5,24 +5,25 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/interfaces/worker" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) func TestNewWorkersStack(t *testing.T) { stack := NewWorkersStack(0) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) - assert.Equal(t, []worker.BaseProcess{}, stack.workers) + assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers) } func TestStack_Push(t *testing.T) { stack := NewWorkersStack(1) - w, err := workerImpl.InitBaseWorker(&exec.Cmd{}) + w, err := worker.InitBaseWorker(&exec.Cmd{}) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) } @@ -30,10 +31,12 @@ func TestStack_Pop(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) _, _ = stack.Pop() @@ -43,12 +46,14 @@ func TestStack_Pop(t *testing.T) { func TestStack_FindAndRemoveByPid(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) stack.FindAndRemoveByPid(w.Pid()) @@ -59,10 +64,12 @@ func TestStack_IsEmpty(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) assert.Equal(t, false, stack.IsEmpty()) @@ -71,11 +78,12 @@ func TestStack_IsEmpty(t *testing.T) { func TestStack_Workers(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) wrks := stack.Workers() assert.Equal(t, 1, len(wrks)) @@ -85,11 +93,13 @@ func TestStack_Workers(t *testing.T) { func TestStack_Reset(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) stack.Reset() assert.Equal(t, uint64(0), stack.actualNumOfWorkers) @@ -98,11 +108,13 @@ func TestStack_Reset(t *testing.T) { func TestStack_Destroy(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + stack.Destroy(context.Background()) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) } @@ -110,12 +122,13 @@ func TestStack_Destroy(t *testing.T) { func TestStack_DestroyWithWait(t *testing.T) { stack := NewWorkersStack(2) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + stack.Push(sw) assert.Equal(t, uint64(2), stack.actualNumOfWorkers) go func() { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index b0d39165..f87bd021 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -5,13 +5,13 @@ import ( "sync" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" ) // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher { ww := &workerWatcher{ stack: NewWorkersStack(uint64(numWorkers)), allocator: allocator, @@ -28,18 +28,18 @@ type workerWatcher struct { events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { +func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { for i := 0; i < len(workers); i++ { ww.stack.Push(workers[i]) - go func(swc worker.BaseProcess) { + go func(swc worker.SyncWorker) { ww.wait(swc) }(workers[i]) } return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) { +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation w, stop := ww.stack.Pop() @@ -94,7 +94,7 @@ func (ww *workerWatcher) AllocateNew() error { return nil } -func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { +func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { ww.mutex.Lock() defer ww.mutex.Unlock() @@ -114,7 +114,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { } // O(1) operation -func (ww *workerWatcher) PushWorker(w worker.BaseProcess) { +func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { ww.mutex.Lock() defer ww.mutex.Unlock() ww.stack.Push(w) @@ -127,11 +127,11 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []worker.BaseProcess { +func (ww *workerWatcher) WorkersList() []*worker.SyncWorkerImpl { return ww.stack.Workers() } -func (ww *workerWatcher) wait(w worker.BaseProcess) { +func (ww *workerWatcher) wait(w worker.SyncWorker) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { @@ -158,7 +158,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { } } -func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { +func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { go func() { ww.wait(wb) }() |