From 7fb3cc3588cfde9260a6bb431330ce1e0a71f56d Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 23 Jan 2021 23:38:10 +0300 Subject: interfaces folder deprecated --- Makefile | 12 +- go.sum | 1 + interfaces/events/handler.go | 14 - interfaces/events/pool_events.go | 66 --- interfaces/events/worker_events.go | 33 -- interfaces/pool/pool.go | 100 ---- interfaces/worker/factory.go | 20 - interfaces/worker/watcher.go | 26 - interfaces/worker/worker.go | 59 --- pkg/doc/pool_workflow.drawio | 1 + pkg/doc/workflow.drawio | 1 + pkg/events/events.go | 41 -- pkg/events/general.go | 39 ++ pkg/events/interface.go | 14 + pkg/events/pool_events.go | 66 +++ pkg/events/worker_events.go | 33 ++ pkg/pipe/pipe_factory.go | 163 ------ pkg/pipe/pipe_factory_spawn_test.go | 490 ------------------ pkg/pipe/pipe_factory_test.go | 510 ------------------- pkg/pool/config.go | 10 +- pkg/pool/interface.go | 29 ++ pkg/pool/static_pool.go | 73 ++- pkg/pool/static_pool_test.go | 4 +- pkg/pool/supervisor_pool.go | 15 +- pkg/pool/supervisor_test.go | 2 +- pkg/socket/socket_factory.go | 229 --------- pkg/socket/socket_factory_spawn_test.go | 507 ------------------- pkg/socket/socket_factory_test.go | 590 ---------------------- pkg/transport/interface.go | 21 + pkg/transport/pipe/pipe_factory.go | 162 ++++++ pkg/transport/pipe/pipe_factory_spawn_test.go | 456 +++++++++++++++++ pkg/transport/pipe/pipe_factory_test.go | 478 ++++++++++++++++++ pkg/transport/socket/socket_factory.go | 228 +++++++++ pkg/transport/socket/socket_factory_spawn_test.go | 489 ++++++++++++++++++ pkg/transport/socket/socket_factory_test.go | 572 +++++++++++++++++++++ pkg/worker/interface.go | 56 ++ pkg/worker/sync_worker.go | 111 ++-- pkg/worker/sync_worker_test.go | 7 +- pkg/worker/worker.go | 10 +- pkg/worker_watcher/interface.go | 30 ++ pkg/worker_watcher/stack.go | 18 +- pkg/worker_watcher/stack_test.go | 53 +- pkg/worker_watcher/worker_watcher.go | 22 +- plugins/http/handler.go | 4 +- plugins/http/plugin.go | 16 +- plugins/informer/interface.go | 2 +- plugins/informer/plugin.go | 2 +- plugins/informer/rpc.go | 2 +- plugins/server/interface.go | 8 +- plugins/server/plugin.go | 17 +- tests/plugins/http/handler_test.go | 218 ++++---- tests/plugins/http/http_plugin_test.go | 2 +- tests/plugins/http/uploads_test.go | 2 +- tests/plugins/informer/test_plugin.go | 18 +- tests/plugins/server/plugin_pipes.go | 8 +- tests/plugins/server/plugin_sockets.go | 8 +- tests/plugins/server/plugin_tcp.go | 8 +- tools/process.go | 2 +- 58 files changed, 3011 insertions(+), 3167 deletions(-) delete mode 100644 interfaces/events/handler.go delete mode 100644 interfaces/events/pool_events.go delete mode 100644 interfaces/events/worker_events.go delete mode 100644 interfaces/pool/pool.go delete mode 100644 interfaces/worker/factory.go delete mode 100644 interfaces/worker/watcher.go delete mode 100644 interfaces/worker/worker.go create mode 100644 pkg/doc/pool_workflow.drawio create mode 100644 pkg/doc/workflow.drawio delete mode 100755 pkg/events/events.go create mode 100755 pkg/events/general.go create mode 100644 pkg/events/interface.go create mode 100644 pkg/events/pool_events.go create mode 100644 pkg/events/worker_events.go delete mode 100755 pkg/pipe/pipe_factory.go delete mode 100644 pkg/pipe/pipe_factory_spawn_test.go delete mode 100755 pkg/pipe/pipe_factory_test.go create mode 100644 pkg/pool/interface.go delete mode 100755 pkg/socket/socket_factory.go delete mode 100644 pkg/socket/socket_factory_spawn_test.go delete mode 100755 pkg/socket/socket_factory_test.go create mode 100644 pkg/transport/interface.go create mode 100755 pkg/transport/pipe/pipe_factory.go create mode 100644 pkg/transport/pipe/pipe_factory_spawn_test.go create mode 100755 pkg/transport/pipe/pipe_factory_test.go create mode 100755 pkg/transport/socket/socket_factory.go create mode 100644 pkg/transport/socket/socket_factory_spawn_test.go create mode 100755 pkg/transport/socket/socket_factory_test.go create mode 100644 pkg/worker/interface.go create mode 100644 pkg/worker_watcher/interface.go diff --git a/Makefile b/Makefile index b2321d59..d61d2bc6 100755 --- a/Makefile +++ b/Makefile @@ -27,9 +27,9 @@ test_coverage: docker-compose -f tests/docker-compose.yaml up -d rm -rf coverage mkdir coverage - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipe.out -covermode=atomic ./pkg/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipe.out -covermode=atomic ./pkg/transport/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/socket.out -covermode=atomic ./pkg/transport/socket go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pool.out -covermode=atomic ./pkg/pool - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/socket.out -covermode=atomic ./pkg/socket go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http @@ -58,9 +58,9 @@ test_coverage: test: ## Run application tests docker-compose -f tests/docker-compose.yaml up -d - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/socket go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pool - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/socket go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/http @@ -88,9 +88,9 @@ test: ## Run application tests test_1.14: ## Run application tests docker-compose -f tests/docker-compose.yaml up -d - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pipe + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/pipe + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/socket go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pool - go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/socket go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker_watcher go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/http diff --git a/go.sum b/go.sum index fa1c2cdf..368e7fd0 100755 --- a/go.sum +++ b/go.sum @@ -311,6 +311,7 @@ github.com/spiral/errors v1.0.9 h1:RcVZ7a1RYkaT3HWFGDuQiDB02pG6yqh7715Uwd7urwM= github.com/spiral/errors v1.0.9/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/goridge/v3 v3.0.0 h1:FIz6wHaob5KynpOfzVpzj4bmqbEelGPFyuEf4i2+CG8= github.com/spiral/goridge/v3 v3.0.0/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= +github.com/spiral/roadrunner v1.9.2 h1:jGtXs3r5fevdbrkDF8BdFxEY4rIZwplnns1oWj7Vyw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/interfaces/events/handler.go b/interfaces/events/handler.go deleted file mode 100644 index ac6c15a4..00000000 --- a/interfaces/events/handler.go +++ /dev/null @@ -1,14 +0,0 @@ -package events - -// Handler interface -type Handler interface { - // Return number of active listeners - NumListeners() int - // AddListener adds lister to the publisher - AddListener(listener Listener) - // Push pushes event to the listeners - Push(e interface{}) -} - -// Event listener listens for the events produced by worker, worker pool or other service. -type Listener func(event interface{}) diff --git a/interfaces/events/pool_events.go b/interfaces/events/pool_events.go deleted file mode 100644 index 2cc76eee..00000000 --- a/interfaces/events/pool_events.go +++ /dev/null @@ -1,66 +0,0 @@ -package events - -// TODO event numbers -const ( - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct P = iota + 7800 - - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - - // EventPoolError caused on pool wide errors. - EventPoolError - - // EventSupervisorError triggered when supervisor can not complete work. - EventSupervisorError - - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) - EventTTL - - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL -) - -type P int64 - -func (ev P) String() string { - switch ev { - case EventWorkerConstruct: - return "EventWorkerConstruct" - case EventWorkerDestruct: - return "EventWorkerDestruct" - case EventPoolError: - return "EventPoolError" - case EventSupervisorError: - return "EventSupervisorError" - case EventNoFreeWorkers: - return "EventNoFreeWorkers" - case EventMaxMemory: - return "EventMaxMemory" - case EventTTL: - return "EventTTL" - case EventIdleTTL: - return "EventIdleTTL" - case EventExecTTL: - return "EventExecTTL" - } - return "Unknown event type" -} - -// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. -type PoolEvent struct { - // Event type, see below. - Event P - - // Payload depends on event type, typically it's worker or error. - Payload interface{} -} diff --git a/interfaces/events/worker_events.go b/interfaces/events/worker_events.go deleted file mode 100644 index 2bff1811..00000000 --- a/interfaces/events/worker_events.go +++ /dev/null @@ -1,33 +0,0 @@ -package events - -const ( - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError W = iota + 200 - - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog -) - -type W int64 - -func (ev W) String() string { - switch ev { - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - } - return "Unknown event type" -} - -// WorkerEvent wraps worker events. -type WorkerEvent struct { - // Event id, see below. - Event W - - // Worker triggered the event. - Worker interface{} - - // Event specific payload. - Payload interface{} -} diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go deleted file mode 100644 index 97cc945c..00000000 --- a/interfaces/pool/pool.go +++ /dev/null @@ -1,100 +0,0 @@ -package pool - -import ( - "context" - "runtime" - "time" - - "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/pkg/payload" -) - -// Pool managed set of inner worker processes. -type Pool interface { - // GetConfig returns pool configuration. - GetConfig() interface{} - - // Exec executes task with payload - Exec(rqs payload.Payload) (payload.Payload, error) - - // ExecWithContext executes task with context which is used with timeout - ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) - - // Workers returns worker list associated with the pool. - Workers() (workers []worker.BaseProcess) - - // Remove worker from the pool. - RemoveWorker(worker worker.BaseProcess) error - - // Destroy all underlying stack (but let them to complete the task). - Destroy(ctx context.Context) -} - -// Configures the pool behaviour. -type Config struct { - // Debug flag creates new fresh worker before every request. - Debug bool - - // NumWorkers defines how many sub-processes can be run at once. This value - // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. - NumWorkers int64 - - // MaxJobs defines how many executions is allowed for the worker until - // it's destruction. set 1 to create new process for each new task, 0 to let - // worker handle as many tasks as it can. - MaxJobs int64 - - // AllocateTimeout defines for how long pool will be waiting for a worker to - // be freed to handle the task. Defaults to 60s. - AllocateTimeout time.Duration - - // DestroyTimeout defines for how long pool should be waiting for worker to - // properly destroy, if timeout reached worker will be killed. Defaults to 60s. - DestroyTimeout time.Duration - - // Supervision config to limit worker and pool memory usage. - Supervisor *SupervisorConfig -} - -// InitDefaults enables default config values. -func (cfg *Config) InitDefaults() { - if cfg.NumWorkers == 0 { - cfg.NumWorkers = int64(runtime.NumCPU()) - } - - if cfg.AllocateTimeout == 0 { - cfg.AllocateTimeout = time.Minute - } - - if cfg.DestroyTimeout == 0 { - cfg.DestroyTimeout = time.Minute - } - if cfg.Supervisor == nil { - return - } - cfg.Supervisor.InitDefaults() -} - -type SupervisorConfig struct { - // WatchTick defines how often to check the state of worker. - WatchTick uint64 - - // TTL defines maximum time worker is allowed to live. - TTL uint64 - - // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL uint64 - - // ExecTTL defines maximum lifetime per job. - ExecTTL uint64 - - // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 -} - -// InitDefaults enables default config values. -func (cfg *SupervisorConfig) InitDefaults() { - if cfg.WatchTick == 0 { - cfg.WatchTick = 1 - } -} diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go deleted file mode 100644 index 376303df..00000000 --- a/interfaces/worker/factory.go +++ /dev/null @@ -1,20 +0,0 @@ -package worker - -import ( - "context" - "os/exec" - - "github.com/spiral/roadrunner/v2/interfaces/events" -) - -// Factory is responsible of wrapping given command into tasks WorkerProcess. -type Factory interface { - // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. - // Process must not be started. - SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (BaseProcess, error) - // SpawnWorker creates new WorkerProcess process based on given command. - // Process must not be started. - SpawnWorker(*exec.Cmd, ...events.Listener) (BaseProcess, error) - // Close the factory and underlying connections. - Close() error -} diff --git a/interfaces/worker/watcher.go b/interfaces/worker/watcher.go deleted file mode 100644 index ce2c1c5a..00000000 --- a/interfaces/worker/watcher.go +++ /dev/null @@ -1,26 +0,0 @@ -package worker - -import "context" - -type Watcher interface { - // AddToWatch used to add stack to wait its state - AddToWatch(workers []BaseProcess) error - - // GetFreeWorker provide first free worker - GetFreeWorker(ctx context.Context) (BaseProcess, error) - - // PutWorker enqueues worker back - PushWorker(w BaseProcess) - - // AllocateNew used to allocate new worker and put in into the WorkerWatcher - AllocateNew() error - - // Destroy destroys the underlying stack - Destroy(ctx context.Context) - - // WorkersList return all stack w/o removing it from internal storage - WorkersList() []BaseProcess - - // RemoveWorker remove worker from the stack - RemoveWorker(wb BaseProcess) error -} diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go deleted file mode 100644 index 0ac82158..00000000 --- a/interfaces/worker/worker.go +++ /dev/null @@ -1,59 +0,0 @@ -package worker - -import ( - "context" - "fmt" - "time" - - "github.com/spiral/goridge/v3/interfaces/relay" - "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/pkg/payload" -) - -// Allocator is responsible for worker allocation in the pool -type Allocator func() (BaseProcess, error) - -type BaseProcess interface { - fmt.Stringer - - // Pid returns worker pid. - Pid() int64 - - // Created returns time worker was created at. - Created() time.Time - - // State return receive-only WorkerProcess state object, state can be used to safely access - // WorkerProcess status, time when status changed and number of WorkerProcess executions. - State() internal.State - - // Start used to run Cmd and immediately return - Start() error - - // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is - // complete and will return process error (if any), if stderr is presented it's value - // will be wrapped as WorkerError. Method will return error code if php process fails - // to find or Start the script. - Wait() error - - // Stop sends soft termination command to the WorkerProcess and waits for process completion. - Stop() error - - // Kill kills underlying process, make sure to call Wait() func to gather - // error log from the stderr. Does not waits for process completion! - Kill() error - - // Relay returns attached to worker goridge relay - Relay() relay.Relay - - // AttachRelay used to attach goridge relay to the worker process - AttachRelay(rl relay.Relay) -} - -type SyncWorker interface { - // BaseProcess provides basic functionality for the SyncWorker - BaseProcess - // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(rqs payload.Payload) (payload.Payload, error) - // ExecWithContext used to handle Exec with TTL - ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) -} diff --git a/pkg/doc/pool_workflow.drawio b/pkg/doc/pool_workflow.drawio new file mode 100644 index 00000000..fd78d5a5 --- /dev/null +++ b/pkg/doc/pool_workflow.drawio @@ -0,0 +1 @@ +jZJNT4QwEIZ/TY8mQCWy10XUxJiNclg9NnSWNimUdIuAv95iW6Ahm8iBzDzz0enbQThvxmdFOvYmKQiURHRE+BElySHOzH8GkwUZxhbUilOL4hWU/AccjBztOYVrkKilFJp3Iaxk20KlA0aUkkOYdpEiPLUjNexAWRGxp2dONfPTmW+NvACvmd6FGuLzHbgyQuWwQbhAOFdSams1Yw5ils9LY+uebkSX2RS0+j8FnylJH75O9cdrkZ3O6XF8Z5c71+WbiN7d2Q2rJy+Ckn1LYW4SIXwcGNdQdqSao4N5dcOYboTxYmO6dqA0jDfnjJfbm8UB2YBWk0nxBV4wtzQH5w7BC1jGNtrfO0bco9dL51UUYzhdvLvq/xfb7DEufgE= \ No newline at end of file diff --git a/pkg/doc/workflow.drawio b/pkg/doc/workflow.drawio new file mode 100644 index 00000000..d32d7b2d --- /dev/null +++ b/pkg/doc/workflow.drawio @@ -0,0 +1 @@ +jZJNT4QwEIZ/TY8mQCWy10XUxJiNclg9NnSWNimUdIuAv95iW6Ahm8iBzDzz0enbQThvxmdFOvYmKQiURHRE+BElySHOzH8GkwUZxhbUilOL4hWU/AccjBztOYVrkKilFJp3Iaxk20KlA0aUkkOYdpEiPLUjNexAWRGxp2dONfPTmW+NvACvmd6FGuLzHbgyQuWwQbhAOFdSams1Yw5ils9LY+uebkSX2RS0+j8FnylJH75O9cdrkZ3O6XF8Z5c71+WbiN7d2Q2rJy+Ckn1LYW4SIXwcGNdQdqSao4N5dcOYboTxYmO6dqA0jDfnjJfbm8UB2YBWk0nxBV4wtzQH5w7BC1jGNtrfO0bco9dL51UUYzhdvLvq/xfb7DEufgE= \ No newline at end of file diff --git a/pkg/events/events.go b/pkg/events/events.go deleted file mode 100755 index 226a0c91..00000000 --- a/pkg/events/events.go +++ /dev/null @@ -1,41 +0,0 @@ -package events - -import ( - "sync" - - "github.com/spiral/roadrunner/v2/interfaces/events" -) - -// HandlerImpl helps to broadcast events to multiple listeners. -type HandlerImpl struct { - listeners []events.Listener - sync.RWMutex // all receivers should be pointers -} - -func NewEventsHandler() events.Handler { - return &HandlerImpl{listeners: make([]events.Listener, 0, 2)} -} - -// NumListeners returns number of event listeners. -func (eb *HandlerImpl) NumListeners() int { - eb.Lock() - defer eb.Unlock() - return len(eb.listeners) -} - -// AddListener registers new event listener. -func (eb *HandlerImpl) AddListener(listener events.Listener) { - eb.Lock() - defer eb.Unlock() - eb.listeners = append(eb.listeners, listener) -} - -// Push broadcast events across all event listeners. -func (eb *HandlerImpl) Push(e interface{}) { - // ReadLock here because we are not changing listeners - eb.RLock() - defer eb.RUnlock() - for k := range eb.listeners { - eb.listeners[k](e) - } -} diff --git a/pkg/events/general.go b/pkg/events/general.go new file mode 100755 index 00000000..a09a8759 --- /dev/null +++ b/pkg/events/general.go @@ -0,0 +1,39 @@ +package events + +import ( + "sync" +) + +// HandlerImpl helps to broadcast events to multiple listeners. +type HandlerImpl struct { + listeners []Listener + sync.RWMutex // all receivers should be pointers +} + +func NewEventsHandler() Handler { + return &HandlerImpl{listeners: make([]Listener, 0, 2)} +} + +// NumListeners returns number of event listeners. +func (eb *HandlerImpl) NumListeners() int { + eb.Lock() + defer eb.Unlock() + return len(eb.listeners) +} + +// AddListener registers new event listener. +func (eb *HandlerImpl) AddListener(listener Listener) { + eb.Lock() + defer eb.Unlock() + eb.listeners = append(eb.listeners, listener) +} + +// Push broadcast events across all event listeners. +func (eb *HandlerImpl) Push(e interface{}) { + // ReadLock here because we are not changing listeners + eb.RLock() + defer eb.RUnlock() + for k := range eb.listeners { + eb.listeners[k](e) + } +} diff --git a/pkg/events/interface.go b/pkg/events/interface.go new file mode 100644 index 00000000..ac6c15a4 --- /dev/null +++ b/pkg/events/interface.go @@ -0,0 +1,14 @@ +package events + +// Handler interface +type Handler interface { + // Return number of active listeners + NumListeners() int + // AddListener adds lister to the publisher + AddListener(listener Listener) + // Push pushes event to the listeners + Push(e interface{}) +} + +// Event listener listens for the events produced by worker, worker pool or other service. +type Listener func(event interface{}) diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go new file mode 100644 index 00000000..2cc76eee --- /dev/null +++ b/pkg/events/pool_events.go @@ -0,0 +1,66 @@ +package events + +// TODO event numbers +const ( + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct P = iota + 7800 + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventPoolError caused on pool wide errors. + EventPoolError + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL +) + +type P int64 + +func (ev P) String() string { + switch ev { + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventPoolError: + return "EventPoolError" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + } + return "Unknown event type" +} + +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event P + + // Payload depends on event type, typically it's worker or error. + Payload interface{} +} diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go new file mode 100644 index 00000000..2bff1811 --- /dev/null +++ b/pkg/events/worker_events.go @@ -0,0 +1,33 @@ +package events + +const ( + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError W = iota + 200 + + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog +) + +type W int64 + +func (ev W) String() string { + switch ev { + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + } + return "Unknown event type" +} + +// WorkerEvent wraps worker events. +type WorkerEvent struct { + // Event id, see below. + Event W + + // Worker triggered the event. + Worker interface{} + + // Event specific payload. + Payload interface{} +} diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go deleted file mode 100755 index b656eff8..00000000 --- a/pkg/pipe/pipe_factory.go +++ /dev/null @@ -1,163 +0,0 @@ -package pipe - -import ( - "context" - "os/exec" - - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/internal" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" - "go.uber.org/multierr" -) - -// Factory connects to stack using standard -// streams (STDIN, STDOUT pipes). -type Factory struct{} - -// NewPipeFactory returns new factory instance and starts -// listening -func NewPipeFactory() worker.Factory { - return &Factory{} -} - -type SpawnResult struct { - w worker.BaseProcess - err error -} - -// SpawnWorker creates new Process and connects it to goridge relay, -// method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { - c := make(chan SpawnResult) - const op = errors.Op("factory_spawn_worker_with_timeout") - go func() { - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) - if err != nil { - c <- SpawnResult{ - w: nil, - err: errors.E(op, err), - } - return - } - - // TODO why out is in? - in, err := cmd.StdoutPipe() - if err != nil { - c <- SpawnResult{ - w: nil, - err: errors.E(op, err), - } - return - } - - // TODO why in is out? - out, err := cmd.StdinPipe() - if err != nil { - c <- SpawnResult{ - w: nil, - err: errors.E(op, err), - } - return - } - - // Init new PIPE relay - relay := pipe.NewPipeRelay(in, out) - w.AttachRelay(relay) - - // Start the worker - err = w.Start() - if err != nil { - c <- SpawnResult{ - w: nil, - err: errors.E(op, err), - } - return - } - - // errors bundle - pid, err := internal.FetchPID(relay) - if pid != w.Pid() || err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - c <- SpawnResult{ - w: nil, - err: errors.E(op, err), - } - return - } - - // everything ok, set ready state - w.State().Set(internal.StateReady) - - // return worker - c <- SpawnResult{ - w: w, - err: nil, - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-c: - if res.err != nil { - return nil, res.err - } - return res.w, nil - } -} - -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { - const op = errors.Op("factory_spawn_worker") - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) - if err != nil { - return nil, errors.E(op, err) - } - - // TODO why out is in? - in, err := cmd.StdoutPipe() - if err != nil { - return nil, errors.E(op, err) - } - - // TODO why in is out? - out, err := cmd.StdinPipe() - if err != nil { - return nil, errors.E(op, err) - } - - // Init new PIPE relay - relay := pipe.NewPipeRelay(in, out) - w.AttachRelay(relay) - - // Start the worker - err = w.Start() - if err != nil { - return nil, errors.E(op, err) - } - - // errors bundle - if pid, err := internal.FetchPID(relay); pid != w.Pid() { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - return nil, errors.E(op, err) - } - - // everything ok, set ready state - w.State().Set(internal.StateReady) - return w, nil -} - -// Close the factory. -func (f *Factory) Close() error { - return nil -} diff --git a/pkg/pipe/pipe_factory_spawn_test.go b/pkg/pipe/pipe_factory_spawn_test.go deleted file mode 100644 index 805a24ee..00000000 --- a/pkg/pipe/pipe_factory_spawn_test.go +++ /dev/null @@ -1,490 +0,0 @@ -package pipe - -import ( - "os/exec" - "strings" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/pkg/payload" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" -) - -func Test_GetState2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - assert.Equal(t, internal.StateStopped, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, internal.StateReady, w.State().Value()) - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Kill2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.Error(t, w.Wait()) - assert.Equal(t, internal.StateErrored, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, internal.StateReady, w.State().Value()) - err = w.Kill() - if err != nil { - t.Errorf("error killing the Process: error %v", err) - } - wg.Wait() -} - -func Test_Pipe_Start2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - assert.NoError(t, w.Stop()) -} - -func Test_Pipe_StartError2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - err := cmd.Start() - if err != nil { - t.Errorf("error running the command: error %v", err) - } - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError3(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError4(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Failboot2(t *testing.T) { - cmd := exec.Command("php", "../../tests/failboot.php") - w, err := NewPipeFactory().SpawnWorker(cmd) - - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") -} - -func Test_Pipe_Invalid2(t *testing.T) { - cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Echo2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Pipe_Broken2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() - - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) -} - -func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { - f := NewPipeFactory() - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorker(cmd) - go func() { - if w.Wait() != nil { - b.Fail() - } - }() - - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } - b.ReportAllocs() - b.ResetTimer() - go func() { - err := w.Wait() - if err != nil { - b.Errorf("error waiting the worker: error %v", err) - } - }() - defer func() { - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - }() - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Test_Echo2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - go func() { - assert.NoError(t, syncWorker.Wait()) - }() - defer func() { - err := syncWorker.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_BadPayload2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - go func() { - assert.NoError(t, syncWorker.Wait()) - }() - defer func() { - err := syncWorker.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := syncWorker.Exec(payload.Payload{}) - - assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - - assert.Contains(t, err.Error(), "payload can not be empty") -} - -func Test_String2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") - assert.Contains(t, w.String(), "ready") - assert.Contains(t, w.String(), "numExecs: 0") -} - -func Test_Echo_Slow2(t *testing.T) { - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Broken2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - - w, err := NewPipeFactory().SpawnWorker(cmd, listener) - if err != nil { - t.Fatal(err) - } - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - - time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { - t.Fail() - } - mu.Unlock() - assert.Error(t, w.Stop()) -} - -func Test_Error2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - - if errors.Is(errors.SoftJob, err) == false { - t.Fatal("error should be of type errors.ErrSoftJob") - } - assert.Contains(t, err.Error(), "hello") -} - -func Test_NumExecs2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, int64(1), w.State().NumExecs()) - - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, int64(2), w.State().NumExecs()) - - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, int64(3), w.State().NumExecs()) -} diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go deleted file mode 100755 index a2731294..00000000 --- a/pkg/pipe/pipe_factory_test.go +++ /dev/null @@ -1,510 +0,0 @@ -package pipe - -import ( - "context" - "os/exec" - "strings" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/pkg/payload" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" -) - -func Test_GetState(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - assert.Equal(t, internal.StateStopped, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, internal.StateReady, w.State().Value()) - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Kill(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.Error(t, w.Wait()) - assert.Equal(t, internal.StateErrored, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, internal.StateReady, w.State().Value()) - err = w.Kill() - if err != nil { - t.Errorf("error killing the Process: error %v", err) - } - wg.Wait() -} - -func Test_Pipe_Start(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - assert.NoError(t, w.Stop()) -} - -func Test_Pipe_StartError(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - err := cmd.Start() - if err != nil { - t.Errorf("error running the command: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Failboot(t *testing.T) { - cmd := exec.Command("php", "../../tests/failboot.php") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") -} - -func Test_Pipe_Invalid(t *testing.T) { - cmd := exec.Command("php", "../../tests/invalid.php") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Echo(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Pipe_Broken(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() - - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) -} - -func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { - f := NewPipeFactory() - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) - go func() { - if w.Wait() != nil { - b.Fail() - } - }() - - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } - b.ReportAllocs() - b.ResetTimer() - go func() { - err := w.Wait() - if err != nil { - b.Errorf("error waiting the worker: error %v", err) - } - }() - defer func() { - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - }() - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Test_Echo(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - go func() { - assert.NoError(t, syncWorker.Wait()) - }() - defer func() { - err := syncWorker.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_BadPayload(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - go func() { - assert.NoError(t, syncWorker.Wait()) - }() - defer func() { - err := syncWorker.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := syncWorker.Exec(payload.Payload{}) - - assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - - assert.Contains(t, err.Error(), "payload can not be empty") -} - -func Test_String(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") - assert.Contains(t, w.String(), "ready") - assert.Contains(t, w.String(), "numExecs: 0") -} - -func Test_Echo_Slow(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Broken(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) - if err != nil { - t.Fatal(err) - } - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - - time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { - t.Fail() - } - mu.Unlock() - assert.Error(t, w.Stop()) -} - -func Test_Error(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - - if errors.Is(errors.SoftJob, err) == false { - t.Fatal("error should be of type errors.ErrSoftJob") - } - assert.Contains(t, err.Error(), "hello") -} - -func Test_NumExecs(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } - - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, int64(1), w.State().NumExecs()) - - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, int64(2), w.State().NumExecs()) - - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, int64(3), w.State().NumExecs()) -} diff --git a/pkg/pool/config.go b/pkg/pool/config.go index e3e2d3cd..cf4aaaee 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -52,19 +52,19 @@ func (cfg *Config) InitDefaults() { type SupervisorConfig struct { // WatchTick defines how often to check the state of worker. - WatchTick uint64 `mapstructure:"watch_tick"` + WatchTick uint64 // TTL defines maximum time worker is allowed to live. - TTL uint64 `mapstructure:"ttl"` + TTL uint64 // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL uint64 `mapstructure:"idle_ttl"` + IdleTTL uint64 // ExecTTL defines maximum lifetime per job. - ExecTTL uint64 `mapstructure:"exec_ttl"` + ExecTTL uint64 // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"` + MaxWorkerMemory uint64 } // InitDefaults enables default config values. diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go new file mode 100644 index 00000000..f3fe4065 --- /dev/null +++ b/pkg/pool/interface.go @@ -0,0 +1,29 @@ +package pool + +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +// Pool managed set of inner worker processes. +type Pool interface { + // GetConfig returns pool configuration. + GetConfig() interface{} + + // Exec executes task with payload + Exec(rqs payload.Payload) (payload.Payload, error) + + // ExecWithContext executes task with context which is used with timeout + ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) + + // Workers returns worker list associated with the pool. + Workers() (workers []*worker.SyncWorkerImpl) + + // Remove worker from the pool. + RemoveWorker(worker worker.SyncWorker) error + + // Destroy all underlying stack (but let them to complete the task). + Destroy(ctx context.Context) +} diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index d1b726c1..bb416b29 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -6,13 +6,11 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/transport" + "github.com/spiral/roadrunner/v2/pkg/worker" workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" ) @@ -20,7 +18,7 @@ import ( const StopRequest = "{\"stop\":true}" // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) +type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error) type Options func(p *StaticPool) @@ -34,7 +32,7 @@ type StaticPool struct { cmd Command // creates and connects to stack - factory worker.Factory + factory transport.Factory // distributes the events events events.Handler @@ -43,7 +41,7 @@ type StaticPool struct { listeners []events.Listener // manages worker states and TTLs - ww worker.Watcher + ww workerWatcher.Watcher // allocate new worker allocator worker.Allocator @@ -53,7 +51,7 @@ type StaticPool struct { } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) { +func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) { const op = errors.Op("static_pool_initialize") if factory == nil { return nil, errors.E(op, errors.Str("no factory initialized")) @@ -69,7 +67,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co cfg: cfg, cmd: cmd, factory: factory, - events: eventsPkg.NewEventsHandler(), + events: events.NewEventsHandler(), } // add pool options @@ -78,7 +76,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co } p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) - p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) workers, err := p.allocateWorkers(p.cfg.NumWorkers) if err != nil { @@ -124,11 +122,11 @@ func (sp *StaticPool) GetConfig() interface{} { } // Workers returns worker list associated with the pool. -func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { +func (sp *StaticPool) Workers() (workers []*worker.SyncWorkerImpl) { return sp.ww.WorkersList() } -func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { +func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error { return sp.ww.RemoveWorker(wb) } @@ -153,12 +151,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { // worker want's to be terminated // TODO careful with string(rsp.Context) if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { - sp.stopWorker(&w) + sp.stopWorker(w) return sp.Exec(p) } - err = sp.checkMaxJobs(&w) + err = sp.checkMaxJobs(w) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -183,11 +181,11 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p // worker want's to be terminated if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { - sp.stopWorker(&w) + sp.stopWorker(w) return sp.ExecWithContext(ctx, p) } - err = sp.checkMaxJobs(&w) + err = sp.checkMaxJobs(w) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -195,30 +193,30 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p return rsp, nil } -func (sp *StaticPool) stopWorker(w *worker.SyncWorker) { +func (sp *StaticPool) stopWorker(w worker.SyncWorker) { const op = errors.Op("static_pool_stop_worker") - (*w).State().Set(internal.StateInvalid) - err := (*w).Stop() + w.State().Set(internal.StateInvalid) + err := w.Stop() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: *w, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } // checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs -func (sp *StaticPool) checkMaxJobs(w *worker.SyncWorker) error { +func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error { const op = errors.Op("static_pool_check_max_jobs") - if sp.cfg.MaxJobs != 0 && (*w).State().NumExecs() >= sp.cfg.MaxJobs { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err := sp.ww.AllocateNew() if err != nil { return errors.E(op, err) } } else { - sp.ww.PushWorker(*w) + sp.ww.PushWorker(w) } return nil } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (*worker.SyncWorkerImpl, error) { // GetFreeWorker function consumes context with timeout w, err := sp.ww.GetFreeWorker(ctxGetFree) if err != nil { @@ -230,7 +228,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke // else if err not nil - return error return nil, errors.E(op, err) } - return w.(worker.SyncWorker), nil + return w, nil } // Destroy all underlying stack (but let them to complete the task). @@ -239,7 +237,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (payload.Payload, error) { + return func(err error, w worker.SyncWorker) (payload.Payload, error) { const op = errors.Op("error encoder") // just push event if on any stage was timeout error if errors.Is(errors.ExecTTL, err) { @@ -277,8 +275,8 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } } -func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator { - return func() (worker.BaseProcess, error) { +func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { + return func() (*worker.SyncWorkerImpl, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...) @@ -286,10 +284,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return nil, err } - sw, err := syncWorker.From(w) - if err != nil { - return nil, err - } + sw := worker.From(w) sp.events.Push(events.PoolEvent{ Event: events.EventWorkerConstruct, @@ -305,7 +300,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, err } - r, err := sw.(worker.SyncWorker).Exec(p) + r, err := sw.Exec(p) if stopErr := sw.Stop(); stopErr != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) @@ -315,9 +310,9 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) { +func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.SyncWorker, error) { const op = errors.Op("allocate workers") - var workers []worker.BaseProcess + var workers []worker.SyncWorker // constant number of stack simplify logic for i := int64(0); i < numWorkers; i++ { @@ -326,11 +321,7 @@ func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, e return nil, errors.E(op, errors.WorkerAllocate, err) } - sw, err := syncWorker.From(w) - if err != nil { - return nil, errors.E(op, err) - } - workers = append(workers, sw) + workers = append(workers, w) } return workers, nil } diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 348f5297..a877b28f 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -12,10 +12,10 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/pipe" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" "github.com/stretchr/testify/assert" ) diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 19cda759..2bae8f9e 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -6,11 +6,10 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/tools" ) @@ -20,7 +19,7 @@ const MB = 1024 * 1024 const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck type Supervised interface { - pool.Pool + Pool // Start used to start watching process for all pool workers Start() } @@ -28,12 +27,12 @@ type Supervised interface { type supervised struct { cfg *SupervisorConfig events events.Handler - pool pool.Pool + pool Pool stopCh chan struct{} mu *sync.RWMutex } -func supervisorWrapper(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised { +func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { sp := &supervised{ cfg: cfg, events: events, @@ -101,13 +100,13 @@ func (sp *supervised) GetConfig() interface{} { return sp.pool.GetConfig() } -func (sp *supervised) Workers() (workers []worker.BaseProcess) { +func (sp *supervised) Workers() (workers []*worker.SyncWorkerImpl) { sp.mu.Lock() defer sp.mu.Unlock() return sp.pool.Workers() } -func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { +func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error { return sp.pool.RemoveWorker(worker) } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index a9424cd5..58f63b7e 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/pipe" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" "github.com/spiral/roadrunner/v2/tools" "github.com/stretchr/testify/assert" ) diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go deleted file mode 100755 index 8f99ff73..00000000 --- a/pkg/socket/socket_factory.go +++ /dev/null @@ -1,229 +0,0 @@ -package socket - -import ( - "context" - "net" - "os/exec" - "sync" - "time" - - "github.com/shirou/gopsutil/process" - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/interfaces/relay" - "github.com/spiral/goridge/v3/pkg/socket" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/internal" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" - - "go.uber.org/multierr" - "golang.org/x/sync/errgroup" -) - -// Factory connects to external stack using socket server. -type Factory struct { - // listens for incoming connections from underlying processes - ls net.Listener - - // relay connection timeout - tout time.Duration - - // sockets which are waiting for process association - relays sync.Map - - ErrCh chan error -} - -// todo: review - -// NewSocketServer returns Factory attached to a given socket listener. -// tout specifies for how long factory should serve for incoming relay connection -func NewSocketServer(ls net.Listener, tout time.Duration) worker.Factory { - f := &Factory{ - ls: ls, - tout: tout, - relays: sync.Map{}, - ErrCh: make(chan error, 10), - } - - // Be careful - // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go - // https://github.com/golang/go/issues/5045 - go func() { - f.ErrCh <- f.listen() - }() - - return f -} - -// blocking operation, returns an error -func (f *Factory) listen() error { - errGr := &errgroup.Group{} - errGr.Go(func() error { - for { - conn, err := f.ls.Accept() - if err != nil { - return err - } - - rl := socket.NewSocketRelay(conn) - pid, err := internal.FetchPID(rl) - if err != nil { - return err - } - - f.attachRelayToPid(pid, rl) - } - }) - - return errGr.Wait() -} - -type socketSpawn struct { - w worker.BaseProcess - err error -} - -// SpawnWorker creates Process and connects it to appropriate relay or returns error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { - const op = errors.Op("factory_spawn_worker_with_timeout") - c := make(chan socketSpawn) - go func() { - ctx, cancel := context.WithTimeout(ctx, f.tout) - defer cancel() - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) - if err != nil { - c <- socketSpawn{ - w: nil, - err: err, - } - return - } - - err = w.Start() - if err != nil { - c <- socketSpawn{ - w: nil, - err: errors.E(op, err), - } - return - } - - rl, err := f.findRelayWithContext(ctx, w) - if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - - c <- socketSpawn{ - w: nil, - err: errors.E(op, err), - } - return - } - - w.AttachRelay(rl) - w.State().Set(internal.StateReady) - - c <- socketSpawn{ - w: w, - err: nil, - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-c: - if res.err != nil { - return nil, res.err - } - - return res.w, nil - } -} - -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { - const op = errors.Op("factory_spawn_worker") - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) - if err != nil { - return nil, err - } - - err = w.Start() - if err != nil { - return nil, errors.E(op, err) - } - - rl, err := f.findRelay(w) - if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - return nil, err - } - - w.AttachRelay(rl) - w.State().Set(internal.StateReady) - - return w, nil -} - -// Close socket factory and underlying socket connection. -func (f *Factory) Close() error { - return f.ls.Close() -} - -// waits for Process to connect over socket and returns associated relay of timeout -func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { - ticker := time.NewTicker(time.Millisecond * 100) - for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-ticker.C: - _, err := process.NewProcess(int32(w.Pid())) - if err != nil { - return nil, err - } - default: - tmp, ok := f.relays.Load(w.Pid()) - if !ok { - continue - } - return tmp.(*socket.Relay), nil - } - } -} - -func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { - const op = errors.Op("factory_find_relay") - // poll every 1ms for the relay - pollDone := time.NewTimer(f.tout) - for { - select { - case <-pollDone.C: - return nil, errors.E(op, errors.Str("relay timeout")) - default: - tmp, ok := f.relays.Load(w.Pid()) - if !ok { - continue - } - return tmp.(*socket.Relay), nil - } - } -} - -// chan to store relay associated with specific pid -func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { - f.relays.Store(pid, relay) -} - -// deletes relay chan associated with specific pid -func (f *Factory) removeRelayFromPid(pid int64) { - f.relays.Delete(pid) -} diff --git a/pkg/socket/socket_factory_spawn_test.go b/pkg/socket/socket_factory_spawn_test.go deleted file mode 100644 index 2f21e408..00000000 --- a/pkg/socket/socket_factory_spawn_test.go +++ /dev/null @@ -1,507 +0,0 @@ -package socket - -import ( - "net" - "os/exec" - "sync" - "syscall" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" -) - -func Test_Tcp_Start2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartCloseFactory2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - f := NewSocketServer(ls, time.Minute) - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - w, err := f.SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartError2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - err = cmd.Start() - if err != nil { - t.Errorf("error executing the command: error %v", err) - } - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Failboot2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err3 := ls.Close() - if err3 != nil { - t.Errorf("error closing the listener: error %v", err3) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/failboot.php") - - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) - assert.Nil(t, w) - assert.Error(t, err2) - assert.Contains(t, err2.Error(), "failboot") -} - -func Test_Tcp_Invalid2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Broken2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - err := w.Wait() - assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") - }() - - defer func() { - time.Sleep(time.Second) - err2 := w.Stop() - // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection - assert.Error(t, err2) - }() - - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - wg.Wait() -} - -func Test_Tcp_Echo2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Unix_Start2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err := ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Unix_Failboot2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err := ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/failboot.php") - - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") -} - -func Test_Unix_Timeout2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err := ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") - - w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "relay timeout") -} - -func Test_Unix_Invalid2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err := ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Unix_Broken2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err := ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - err := w.Wait() - assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") - }() - - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() - - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) - wg.Wait() -} - -func Test_Unix_Echo2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err := ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(b, err) - defer func() { - err := ls.Close() - assert.NoError(b, err) - }() - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := f.SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - go func() { - assert.NoError(b, w.Wait()) - }() - - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(b, err) - defer func() { - err := ls.Close() - assert.NoError(b, err) - }() - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { - defer func() { - _ = syscall.Unlink("sock.unix") - }() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err := ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := f.SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { - defer func() { - _ = syscall.Unlink("sock.unix") - }() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err := ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go deleted file mode 100755 index 983f3e8e..00000000 --- a/pkg/socket/socket_factory_test.go +++ /dev/null @@ -1,590 +0,0 @@ -package socket - -import ( - "context" - "net" - "os/exec" - "sync" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" -) - -func Test_Tcp_Start(t *testing.T) { - ctx := context.Background() - time.Sleep(time.Millisecond * 10) // to ensure free socket - - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartCloseFactory(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - f := NewSocketServer(ls, time.Minute) - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - w, err := f.SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartError(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - err = cmd.Start() - if err != nil { - t.Errorf("error executing the command: error %v", err) - } - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Failboot(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err3 := ls.Close() - if err3 != nil { - t.Errorf("error closing the listener: error %v", err3) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/failboot.php") - - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) - assert.Nil(t, w) - assert.Error(t, err2) - assert.Contains(t, err2.Error(), "failboot") -} - -func Test_Tcp_Timeout(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0") - - w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "context deadline exceeded") -} - -func Test_Tcp_Invalid(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Broken(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - err := w.Wait() - assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") - }() - - defer func() { - time.Sleep(time.Second) - err2 := w.Stop() - // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection - assert.Error(t, err2) - }() - - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - wg.Wait() -} - -func Test_Tcp_Echo(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Unix_Start(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Unix_Failboot(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - ctx := context.Background() - if err == nil { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/failboot.php") - - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") -} - -func Test_Unix_Timeout(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - ctx := context.Background() - if err == nil { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") - - w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "context deadline exceeded") -} - -func Test_Unix_Invalid(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Unix_Broken(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - err := w.Wait() - assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") - }() - - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() - - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) - wg.Wait() -} - -func Test_Unix_Echo(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err := ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := f.SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - go func() { - assert.NoError(b, w.Wait()) - }() - - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err := ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := f.SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err := ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go new file mode 100644 index 00000000..299ac95f --- /dev/null +++ b/pkg/transport/interface.go @@ -0,0 +1,21 @@ +package transport + +import ( + "context" + "os/exec" + + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +// Factory is responsible of wrapping given command into tasks WorkerProcess. +type Factory interface { + // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. + // Process must not be started. + SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) + // SpawnWorker creates new WorkerProcess process based on given command. + // Process must not be started. + SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error) + // Close the factory and underlying connections. + Close() error +} diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go new file mode 100755 index 00000000..dd7c5841 --- /dev/null +++ b/pkg/transport/pipe/pipe_factory.go @@ -0,0 +1,162 @@ +package pipe + +import ( + "context" + "os/exec" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/pkg/pipe" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" + "go.uber.org/multierr" +) + +// Factory connects to stack using standard +// streams (STDIN, STDOUT pipes). +type Factory struct{} + +// NewPipeFactory returns new factory instance and starts +// listening +func NewPipeFactory() *Factory { + return &Factory{} +} + +type SpawnResult struct { + w *worker.Process + err error +} + +// SpawnWorker creates new Process and connects it to goridge relay, +// method Wait() must be handled on level above. +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + c := make(chan SpawnResult) + const op = errors.Op("factory_spawn_worker_with_timeout") + go func() { + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // TODO why out is in? + in, err := cmd.StdoutPipe() + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // TODO why in is out? + out, err := cmd.StdinPipe() + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // errors bundle + pid, err := internal.FetchPID(relay) + if pid != w.Pid() || err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // everything ok, set ready state + w.State().Set(internal.StateReady) + + // return worker + c <- SpawnResult{ + w: w, + err: nil, + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-c: + if res.err != nil { + return nil, res.err + } + return res.w, nil + } +} + +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + const op = errors.Op("factory_spawn_worker") + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + return nil, errors.E(op, err) + } + + // TODO why out is in? + in, err := cmd.StdoutPipe() + if err != nil { + return nil, errors.E(op, err) + } + + // TODO why in is out? + out, err := cmd.StdinPipe() + if err != nil { + return nil, errors.E(op, err) + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + return nil, errors.E(op, err) + } + + // errors bundle + if pid, err := internal.FetchPID(relay); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, errors.E(op, err) + } + + // everything ok, set ready state + w.State().Set(internal.StateReady) + return w, nil +} + +// Close the factory. +func (f *Factory) Close() error { + return nil +} diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go new file mode 100644 index 00000000..2e5bbcd5 --- /dev/null +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -0,0 +1,456 @@ +package pipe + +import ( + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func Test_GetState2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, internal.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, internal.StateReady, w.State().Value()) + assert.NoError(t, w.Stop()) +} + +func Test_Kill2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + assert.Equal(t, internal.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, internal.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop()) +} + +func Test_Pipe_StartError2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError3(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError4(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/failboot.php") + w, err := NewPipeFactory().SpawnWorker(cmd) + + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Pipe_Invalid2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/invalid.php") + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) +} + +func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { + f := NewPipeFactory() + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorker(cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + sw := worker.From(w) + + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(payload.Payload{}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_Echo_Slow2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + data := "" + mu := &sync.Mutex{} + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + mu.Lock() + data = string(wev.Payload.([]byte)) + mu.Unlock() + } + } + + w, err := NewPipeFactory().SpawnWorker(cmd, listener) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + time.Sleep(time.Second * 3) + mu.Lock() + if strings.ContainsAny(data, "undefined_function()") == false { + t.Fail() + } + mu.Unlock() + assert.Error(t, w.Stop()) +} + +func Test_Error2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "hello") +} + +func Test_NumExecs2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(1), w.State().NumExecs()) + + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(2), w.State().NumExecs()) + + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(3), w.State().NumExecs()) +} diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go new file mode 100755 index 00000000..fa37ac0f --- /dev/null +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -0,0 +1,478 @@ +package pipe + +import ( + "context" + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func Test_GetState(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, internal.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, internal.StateReady, w.State().Value()) + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Kill(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + assert.Equal(t, internal.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, internal.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop()) +} + +func Test_Pipe_StartError(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot(t *testing.T) { + cmd := exec.Command("php", "../../../tests/failboot.php") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Pipe_Invalid(t *testing.T) { + cmd := exec.Command("php", "../../../tests/invalid.php") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) +} + +func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { + f := NewPipeFactory() + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) + sw := worker.From(w) + + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(payload.Payload{}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_Echo_Slow(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + data := "" + mu := &sync.Mutex{} + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + mu.Lock() + data = string(wev.Payload.([]byte)) + mu.Unlock() + } + } + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + time.Sleep(time.Second * 3) + mu.Lock() + if strings.ContainsAny(data, "undefined_function()") == false { + t.Fail() + } + mu.Unlock() + assert.Error(t, w.Stop()) +} + +func Test_Error(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "hello") +} + +func Test_NumExecs(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(1), w.State().NumExecs()) + + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(2), w.State().NumExecs()) + + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(3), w.State().NumExecs()) +} diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go new file mode 100755 index 00000000..ccd2b0bf --- /dev/null +++ b/pkg/transport/socket/socket_factory.go @@ -0,0 +1,228 @@ +package socket + +import ( + "context" + "net" + "os/exec" + "sync" + "time" + + "github.com/shirou/gopsutil/process" + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/socket" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" + + "go.uber.org/multierr" + "golang.org/x/sync/errgroup" +) + +// Factory connects to external stack using socket server. +type Factory struct { + // listens for incoming connections from underlying processes + ls net.Listener + + // relay connection timeout + tout time.Duration + + // sockets which are waiting for process association + relays sync.Map + + ErrCh chan error +} + +// todo: review + +// NewSocketServer returns Factory attached to a given socket listener. +// tout specifies for how long factory should serve for incoming relay connection +func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { + f := &Factory{ + ls: ls, + tout: tout, + relays: sync.Map{}, + ErrCh: make(chan error, 10), + } + + // Be careful + // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go + // https://github.com/golang/go/issues/5045 + go func() { + f.ErrCh <- f.listen() + }() + + return f +} + +// blocking operation, returns an error +func (f *Factory) listen() error { + errGr := &errgroup.Group{} + errGr.Go(func() error { + for { + conn, err := f.ls.Accept() + if err != nil { + return err + } + + rl := socket.NewSocketRelay(conn) + pid, err := internal.FetchPID(rl) + if err != nil { + return err + } + + f.attachRelayToPid(pid, rl) + } + }) + + return errGr.Wait() +} + +type socketSpawn struct { + w *worker.Process + err error +} + +// SpawnWorker creates Process and connects it to appropriate relay or returns error +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + const op = errors.Op("factory_spawn_worker_with_timeout") + c := make(chan socketSpawn) + go func() { + ctx, cancel := context.WithTimeout(ctx, f.tout) + defer cancel() + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + c <- socketSpawn{ + w: nil, + err: err, + } + return + } + + err = w.Start() + if err != nil { + c <- socketSpawn{ + w: nil, + err: errors.E(op, err), + } + return + } + + rl, err := f.findRelayWithContext(ctx, w) + if err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + + c <- socketSpawn{ + w: nil, + err: errors.E(op, err), + } + return + } + + w.AttachRelay(rl) + w.State().Set(internal.StateReady) + + c <- socketSpawn{ + w: w, + err: nil, + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-c: + if res.err != nil { + return nil, res.err + } + + return res.w, nil + } +} + +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + const op = errors.Op("factory_spawn_worker") + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + return nil, err + } + + err = w.Start() + if err != nil { + return nil, errors.E(op, err) + } + + rl, err := f.findRelay(w) + if err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, err + } + + w.AttachRelay(rl) + w.State().Set(internal.StateReady) + + return w, nil +} + +// Close socket factory and underlying socket connection. +func (f *Factory) Close() error { + return f.ls.Close() +} + +// waits for Process to connect over socket and returns associated relay of timeout +func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { + ticker := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + _, err := process.NewProcess(int32(w.Pid())) + if err != nil { + return nil, err + } + default: + tmp, ok := f.relays.Load(w.Pid()) + if !ok { + continue + } + return tmp.(*socket.Relay), nil + } + } +} + +func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { + const op = errors.Op("factory_find_relay") + // poll every 1ms for the relay + pollDone := time.NewTimer(f.tout) + for { + select { + case <-pollDone.C: + return nil, errors.E(op, errors.Str("relay timeout")) + default: + tmp, ok := f.relays.Load(w.Pid()) + if !ok { + continue + } + return tmp.(*socket.Relay), nil + } + } +} + +// chan to store relay associated with specific pid +func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { + f.relays.Store(pid, relay) +} + +// deletes relay chan associated with specific pid +func (f *Factory) removeRelayFromPid(pid int64) { + f.relays.Delete(pid) +} diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go new file mode 100644 index 00000000..0e29e7d2 --- /dev/null +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -0,0 +1,489 @@ +package socket + +import ( + "net" + "os/exec" + "sync" + "syscall" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func Test_Tcp_Start2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartCloseFactory2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + f := NewSocketServer(ls, time.Minute) + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + w, err := f.SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartError2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err = cmd.Start() + if err != nil { + t.Errorf("error executing the command: error %v", err) + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Failboot2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/failboot.php") + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err2) + assert.Contains(t, err2.Error(), "failboot") +} + +func Test_Tcp_Invalid2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Broken2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := w.Wait() + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + + defer func() { + time.Sleep(time.Second) + err2 := w.Stop() + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + assert.Error(t, err2) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + wg.Wait() +} + +func Test_Tcp_Echo2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Unix_Start2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Unix_Failboot2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/failboot.php") + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Unix_Timeout2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "relay timeout") +} + +func Test_Unix_Invalid2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Unix_Broken2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := w.Wait() + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + wg.Wait() +} + +func Test_Unix_Echo2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(b, err) + defer func() { + err := ls.Close() + assert.NoError(b, err) + }() + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := f.SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + go func() { + assert.NoError(b, w.Wait()) + }() + + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(b, err) + defer func() { + err := ls.Close() + assert.NoError(b, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { + defer func() { + _ = syscall.Unlink("sock.unix") + }() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := f.SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { + defer func() { + _ = syscall.Unlink("sock.unix") + }() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go new file mode 100755 index 00000000..f55fc3dd --- /dev/null +++ b/pkg/transport/socket/socket_factory_test.go @@ -0,0 +1,572 @@ +package socket + +import ( + "context" + "net" + "os/exec" + "sync" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func Test_Tcp_Start(t *testing.T) { + ctx := context.Background() + time.Sleep(time.Millisecond * 10) // to ensure free socket + + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartCloseFactory(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + f := NewSocketServer(ls, time.Minute) + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartError(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err = cmd.Start() + if err != nil { + t.Errorf("error executing the command: error %v", err) + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Failboot(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/failboot.php") + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err2) + assert.Contains(t, err2.Error(), "failboot") +} + +func Test_Tcp_Timeout(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "tcp", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") +} + +func Test_Tcp_Invalid(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Broken(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := w.Wait() + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + + defer func() { + time.Sleep(time.Second) + err2 := w.Stop() + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + assert.Error(t, err2) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + wg.Wait() +} + +func Test_Tcp_Echo(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Unix_Start(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Unix_Failboot(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + ctx := context.Background() + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/failboot.php") + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Unix_Timeout(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + ctx := context.Background() + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") +} + +func Test_Unix_Invalid(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Unix_Broken(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := w.Wait() + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + wg.Wait() +} + +func Test_Unix_Echo(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + go func() { + assert.NoError(b, w.Wait()) + }() + + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go new file mode 100644 index 00000000..9d74ae10 --- /dev/null +++ b/pkg/worker/interface.go @@ -0,0 +1,56 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" +) + +type BaseProcess interface { + fmt.Stringer + + // Pid returns worker pid. + Pid() int64 + + // Created returns time worker was created at. + Created() time.Time + + // State return receive-only WorkerProcess state object, state can be used to safely access + // WorkerProcess status, time when status changed and number of WorkerProcess executions. + State() internal.State + + // Start used to run Cmd and immediately return + Start() error + + // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is + // complete and will return process error (if any), if stderr is presented it's value + // will be wrapped as WorkerError. Method will return error code if php process fails + // to find or Start the script. + Wait() error + + // Stop sends soft termination command to the WorkerProcess and waits for process completion. + Stop() error + + // Kill kills underlying process, make sure to call Wait() func to gather + // error log from the stderr. Does not waits for process completion! + Kill() error + + // Relay returns attached to worker goridge relay + Relay() relay.Relay + + // AttachRelay used to attach goridge relay to the worker process + AttachRelay(rl relay.Relay) +} + +type SyncWorker interface { + // BaseProcess provides basic functionality for the SyncWorker + BaseProcess + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS + Exec(rqs payload.Payload) (payload.Payload, error) + // ExecWithContext used to handle Exec with TTL + ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) +} diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 8314c039..1a0393fb 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -8,50 +8,67 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" ) -type syncWorker struct { - w worker.BaseProcess +// Allocator is responsible for worker allocation in the pool +type Allocator func() (*SyncWorkerImpl, error) + +type SyncWorkerImpl struct { + process *Process } // From creates SyncWorker from BaseProcess -func From(w worker.BaseProcess) (worker.SyncWorker, error) { - return &syncWorker{ - w: w, - }, nil +func From(process *Process) *SyncWorkerImpl { + return &SyncWorkerImpl{ + process: process, + } +} + +// FromSync creates BaseProcess from SyncWorkerImpl +func FromSync(w *SyncWorkerImpl) BaseProcess { + return &Process{ + created: w.process.created, + events: w.process.events, + state: w.process.state, + cmd: w.process.cmd, + pid: w.process.pid, + stderr: w.process.stderr, + endState: w.process.endState, + relay: w.process.relay, + rd: w.process.rd, + } } // Exec payload without TTL timeout. -func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec") if len(p.Body) == 0 && len(p.Context) == 0 { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.w.State().Value() != internal.StateReady { - return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) + if tw.process.State().Value() != internal.StateReady { + return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(internal.StateWorking) + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.w.State().Set(internal.StateErrored) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.w.State().Set(internal.StateReady) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() return rsp, nil } @@ -62,7 +79,7 @@ type wexec struct { } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) @@ -75,24 +92,24 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p return } - if tw.w.State().Value() != internal.StateReady { + if tw.process.State().Value() != internal.StateReady { c <- wexec{ payload: payload.Payload{}, - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())), + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), } return } // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(internal.StateWorking) + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.w.State().Set(internal.StateErrored) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() } c <- wexec{ payload: payload.Payload{}, @@ -101,8 +118,8 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p return } - tw.w.State().Set(internal.StateReady) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() c <- wexec{ payload: rsp, @@ -128,7 +145,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p } } -func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_payload") fr := frame.NewFrame() @@ -156,7 +173,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { frameR := frame.NewFrame() - err = tw.w.Relay().Receive(frameR) + err = tw.process.Relay().Receive(frameR) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -186,42 +203,42 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { return pl, nil } -func (tw *syncWorker) String() string { - return tw.w.String() +func (tw *SyncWorkerImpl) String() string { + return tw.process.String() } -func (tw *syncWorker) Pid() int64 { - return tw.w.Pid() +func (tw *SyncWorkerImpl) Pid() int64 { + return tw.process.Pid() } -func (tw *syncWorker) Created() time.Time { - return tw.w.Created() +func (tw *SyncWorkerImpl) Created() time.Time { + return tw.process.Created() } -func (tw *syncWorker) State() internal.State { - return tw.w.State() +func (tw *SyncWorkerImpl) State() internal.State { + return tw.process.State() } -func (tw *syncWorker) Start() error { - return tw.w.Start() +func (tw *SyncWorkerImpl) Start() error { + return tw.process.Start() } -func (tw *syncWorker) Wait() error { - return tw.w.Wait() +func (tw *SyncWorkerImpl) Wait() error { + return tw.process.Wait() } -func (tw *syncWorker) Stop() error { - return tw.w.Stop() +func (tw *SyncWorkerImpl) Stop() error { + return tw.process.Stop() } -func (tw *syncWorker) Kill() error { - return tw.w.Kill() +func (tw *SyncWorkerImpl) Kill() error { + return tw.process.Kill() } -func (tw *syncWorker) Relay() relay.Relay { - return tw.w.Relay() +func (tw *SyncWorkerImpl) Relay() relay.Relay { + return tw.process.Relay() } -func (tw *syncWorker) AttachRelay(rl relay.Relay) { - tw.w.AttachRelay(rl) +func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { + tw.process.AttachRelay(rl) } diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go index 40988b06..df556e93 100755 --- a/pkg/worker/sync_worker_test.go +++ b/pkg/worker/sync_worker_test.go @@ -22,12 +22,9 @@ func Test_NotStarted_Exec(t *testing.T) { w, _ := InitBaseWorker(cmd) - syncWorker, err := From(w) - if err != nil { - t.Fatal(err) - } + sw := From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index bf70d646..8fd71cca 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -13,10 +13,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/events" "go.uber.org/multierr" ) @@ -78,14 +76,14 @@ type Process struct { } // InitBaseWorker creates new Process over given exec.cmd. -func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, error) { +func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { const op = errors.Op("init_base_worker") if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } w := &Process{ created: time.Now(), - events: eventsPkg.NewEventsHandler(), + events: events.NewEventsHandler(), cmd: cmd, state: internal.NewWorkerState(internal.StateInactive), stderr: new(bytes.Buffer), @@ -198,7 +196,7 @@ func (w *Process) Wait() error { // at this point according to the documentation (see cmd.Wait comment) // if worker finishes with an error, message will be written to the stderr first - // and then w.cmd.Wait return an error + // and then process.cmd.Wait return an error w.endState = w.cmd.ProcessState if err != nil { w.state.Set(internal.StateErrored) diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go new file mode 100644 index 00000000..13991541 --- /dev/null +++ b/pkg/worker_watcher/interface.go @@ -0,0 +1,30 @@ +package worker_watcher //nolint:golint,stylecheck + +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Watcher interface { + // AddToWatch used to add stack to wait its state + AddToWatch(workers []worker.SyncWorker) error + + // GetFreeWorker provide first free worker + GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) + + // PutWorker enqueues worker back + PushWorker(w worker.SyncWorker) + + // AllocateNew used to allocate new worker and put in into the WorkerWatcher + AllocateNew() error + + // Destroy destroys the underlying stack + Destroy(ctx context.Context) + + // WorkersList return all stack w/o removing it from internal storage + WorkersList() []*worker.SyncWorkerImpl + + // RemoveWorker remove worker from the stack + RemoveWorker(wb worker.SyncWorker) error +} diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index c87e8b65..2d23d0e9 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -5,12 +5,12 @@ import ( "sync" "time" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" ) type Stack struct { - workers []worker.BaseProcess + workers []*worker.SyncWorkerImpl mutex sync.RWMutex destroy bool actualNumOfWorkers uint64 @@ -20,7 +20,7 @@ type Stack struct { func NewWorkersStack(initialNumOfWorkers uint64) *Stack { w := runtime.NumCPU() return &Stack{ - workers: make([]worker.BaseProcess, 0, w), + workers: make([]*worker.SyncWorkerImpl, 0, w), actualNumOfWorkers: 0, initialNumOfWorkers: initialNumOfWorkers, } @@ -39,7 +39,7 @@ func (stack *Stack) Push(w worker.BaseProcess) { stack.mutex.Lock() defer stack.mutex.Unlock() stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w) + stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl)) } func (stack *Stack) IsEmpty() bool { @@ -48,7 +48,7 @@ func (stack *Stack) IsEmpty() bool { return len(stack.workers) == 0 } -func (stack *Stack) Pop() (worker.BaseProcess, bool) { +func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { stack.mutex.Lock() defer stack.mutex.Unlock() @@ -85,13 +85,15 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { } // Workers return copy of the workers in the stack -func (stack *Stack) Workers() []worker.BaseProcess { +func (stack *Stack) Workers() []*worker.SyncWorkerImpl { stack.mutex.Lock() defer stack.mutex.Unlock() - workersCopy := make([]worker.BaseProcess, 0, 1) + workersCopy := make([]*worker.SyncWorkerImpl, 0, 1) // copy for _, v := range stack.workers { - workersCopy = append(workersCopy, v) + if v != nil { + workersCopy = append(workersCopy, v) + } } return workersCopy diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go index 86af2043..5287a6dc 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/stack_test.go @@ -5,24 +5,25 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/interfaces/worker" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) func TestNewWorkersStack(t *testing.T) { stack := NewWorkersStack(0) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) - assert.Equal(t, []worker.BaseProcess{}, stack.workers) + assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers) } func TestStack_Push(t *testing.T) { stack := NewWorkersStack(1) - w, err := workerImpl.InitBaseWorker(&exec.Cmd{}) + w, err := worker.InitBaseWorker(&exec.Cmd{}) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) } @@ -30,10 +31,12 @@ func TestStack_Pop(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) _, _ = stack.Pop() @@ -43,12 +46,14 @@ func TestStack_Pop(t *testing.T) { func TestStack_FindAndRemoveByPid(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) stack.FindAndRemoveByPid(w.Pid()) @@ -59,10 +64,12 @@ func TestStack_IsEmpty(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) assert.Equal(t, false, stack.IsEmpty()) @@ -71,11 +78,12 @@ func TestStack_IsEmpty(t *testing.T) { func TestStack_Workers(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) wrks := stack.Workers() assert.Equal(t, 1, len(wrks)) @@ -85,11 +93,13 @@ func TestStack_Workers(t *testing.T) { func TestStack_Reset(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) stack.Reset() assert.Equal(t, uint64(0), stack.actualNumOfWorkers) @@ -98,11 +108,13 @@ func TestStack_Reset(t *testing.T) { func TestStack_Destroy(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + stack.Destroy(context.Background()) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) } @@ -110,12 +122,13 @@ func TestStack_Destroy(t *testing.T) { func TestStack_DestroyWithWait(t *testing.T) { stack := NewWorkersStack(2) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + stack.Push(sw) assert.Equal(t, uint64(2), stack.actualNumOfWorkers) go func() { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index b0d39165..f87bd021 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -5,13 +5,13 @@ import ( "sync" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" ) // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher { ww := &workerWatcher{ stack: NewWorkersStack(uint64(numWorkers)), allocator: allocator, @@ -28,18 +28,18 @@ type workerWatcher struct { events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { +func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { for i := 0; i < len(workers); i++ { ww.stack.Push(workers[i]) - go func(swc worker.BaseProcess) { + go func(swc worker.SyncWorker) { ww.wait(swc) }(workers[i]) } return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) { +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation w, stop := ww.stack.Pop() @@ -94,7 +94,7 @@ func (ww *workerWatcher) AllocateNew() error { return nil } -func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { +func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { ww.mutex.Lock() defer ww.mutex.Unlock() @@ -114,7 +114,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { } // O(1) operation -func (ww *workerWatcher) PushWorker(w worker.BaseProcess) { +func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { ww.mutex.Lock() defer ww.mutex.Unlock() ww.stack.Push(w) @@ -127,11 +127,11 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []worker.BaseProcess { +func (ww *workerWatcher) WorkersList() []*worker.SyncWorkerImpl { return ww.stack.Workers() } -func (ww *workerWatcher) wait(w worker.BaseProcess) { +func (ww *workerWatcher) wait(w worker.SyncWorker) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { @@ -158,7 +158,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { } } -func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { +func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { go func() { ww.wait(wb) }() diff --git a/plugins/http/handler.go b/plugins/http/handler.go index ecdcb2c0..0e7481b5 100644 --- a/plugins/http/handler.go +++ b/plugins/http/handler.go @@ -10,8 +10,8 @@ import ( "github.com/hashicorp/go-multierror" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index b1c68d89..144148af 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -15,9 +15,8 @@ import ( "github.com/hashicorp/go-multierror" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/checker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -104,7 +103,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se s.cfg.Env[RR_HTTP] = "true" - s.pool, err = server.NewWorkerPool(context.Background(), poolImpl.Config{ + s.pool, err = server.NewWorkerPool(context.Background(), pool.Config{ Debug: s.cfg.Pool.Debug, NumWorkers: s.cfg.Pool.NumWorkers, MaxJobs: s.cfg.Pool.MaxJobs, @@ -304,7 +303,12 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Workers returns associated pool workers func (s *Plugin) Workers() []worker.BaseProcess { - return s.pool.Workers() + workers := s.pool.Workers() + baseWorkers := make([]worker.BaseProcess, 0, len(workers)) + for i := 0; i < len(workers); i++ { + baseWorkers = append(baseWorkers, worker.FromSync(workers[i])) + } + return baseWorkers } // Name returns endure.Named interface implementation @@ -322,7 +326,7 @@ func (s *Plugin) Reset() error { s.pool = nil var err error - s.pool, err = s.server.NewWorkerPool(context.Background(), poolImpl.Config{ + s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{ Debug: s.cfg.Pool.Debug, NumWorkers: s.cfg.Pool.NumWorkers, MaxJobs: s.cfg.Pool.MaxJobs, diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go index 27139ae1..8e3b922b 100644 --- a/plugins/informer/interface.go +++ b/plugins/informer/interface.go @@ -1,6 +1,6 @@ package informer -import "github.com/spiral/roadrunner/v2/interfaces/worker" +import "github.com/spiral/roadrunner/v2/pkg/worker" // Informer used to get workers from particular plugin or set of plugins type Informer interface { diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go index 7200c51f..416c0112 100644 --- a/plugins/informer/plugin.go +++ b/plugins/informer/plugin.go @@ -3,7 +3,7 @@ package informer import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go index 98b5681c..c036ae96 100644 --- a/plugins/informer/rpc.go +++ b/plugins/informer/rpc.go @@ -1,7 +1,7 @@ package informer import ( - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/tools" ) diff --git a/plugins/server/interface.go b/plugins/server/interface.go index a2d8b92b..fe04b85b 100644 --- a/plugins/server/interface.go +++ b/plugins/server/interface.go @@ -4,10 +4,10 @@ import ( "context" "os/exec" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/worker" ) // Env variables type alias @@ -16,6 +16,6 @@ type Env map[string]string // Server creates workers for the application. type Server interface { CmdFactory(env Env) (func() *exec.Cmd, error) - NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) + NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error) } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 0c9c49ea..16123786 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -8,16 +8,17 @@ import ( "strings" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/transport" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" // core imports - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/pkg/pipe" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/socket" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + "github.com/spiral/roadrunner/v2/pkg/transport/socket" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/utils" ) @@ -33,7 +34,7 @@ const RR_RPC = "RR_RPC" //nolint:golint,stylecheck type Plugin struct { cfg Config log logger.Logger - factory worker.Factory + factory transport.Factory } // Init application provider. @@ -115,7 +116,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { } // NewWorker issues new standalone worker. -func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) { +func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) { const op = errors.Op("server_plugin_new_worker") list := make([]events.Listener, 0, len(listeners)) @@ -157,7 +158,7 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, en } // creates relay and worker factory. -func (server *Plugin) initFactory() (worker.Factory, error) { +func (server *Plugin) initFactory() (transport.Factory, error) { const op = errors.Op("server_plugin_init_factory") if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" { return pipe.NewPipeFactory(), nil diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index ee6f795d..45931a49 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -10,8 +10,8 @@ import ( "runtime" "strings" - "github.com/spiral/roadrunner/v2/pkg/pipe" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/stretchr/testify/assert" @@ -23,10 +23,10 @@ import ( ) func TestHandler_Echo(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -38,7 +38,7 @@ func TestHandler_Echo(t *testing.T) { h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -74,10 +74,10 @@ func Test_HandlerErrors(t *testing.T) { } func TestHandler_Headers(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "header", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -86,13 +86,13 @@ func TestHandler_Headers(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8078", Handler: h} @@ -135,10 +135,10 @@ func TestHandler_Headers(t *testing.T) { } func TestHandler_Empty_User_Agent(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -147,13 +147,13 @@ func TestHandler_Empty_User_Agent(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8088", Handler: h} @@ -195,10 +195,10 @@ func TestHandler_Empty_User_Agent(t *testing.T) { } func TestHandler_User_Agent(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -207,13 +207,13 @@ func TestHandler_User_Agent(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8088", Handler: h} @@ -255,10 +255,10 @@ func TestHandler_User_Agent(t *testing.T) { } func TestHandler_Cookies(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "cookie", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -267,13 +267,13 @@ func TestHandler_Cookies(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8079", Handler: h} @@ -320,10 +320,10 @@ func TestHandler_Cookies(t *testing.T) { } func TestHandler_JsonPayload_POST(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -332,13 +332,13 @@ func TestHandler_JsonPayload_POST(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8090", Handler: h} @@ -384,10 +384,10 @@ func TestHandler_JsonPayload_POST(t *testing.T) { } func TestHandler_JsonPayload_PUT(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -396,13 +396,13 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8081", Handler: h} @@ -444,10 +444,10 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { } func TestHandler_JsonPayload_PATCH(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -456,13 +456,13 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8082", Handler: h} @@ -504,10 +504,10 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { } func TestHandler_FormData_POST(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -516,13 +516,13 @@ func TestHandler_FormData_POST(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8083", Handler: h} @@ -577,10 +577,10 @@ func TestHandler_FormData_POST(t *testing.T) { } func TestHandler_FormData_POST_Overwrite(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -589,13 +589,13 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8083", Handler: h} @@ -650,10 +650,10 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { } func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -662,13 +662,13 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8083", Handler: h} @@ -722,10 +722,10 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { } func TestHandler_FormData_PUT(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -734,13 +734,13 @@ func TestHandler_FormData_PUT(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":17834", Handler: h} @@ -794,10 +794,10 @@ func TestHandler_FormData_PUT(t *testing.T) { } func TestHandler_FormData_PATCH(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -806,13 +806,13 @@ func TestHandler_FormData_PATCH(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8085", Handler: h} @@ -866,10 +866,10 @@ func TestHandler_FormData_PATCH(t *testing.T) { } func TestHandler_Multipart_POST(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -878,13 +878,13 @@ func TestHandler_Multipart_POST(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8019", Handler: h} @@ -980,10 +980,10 @@ func TestHandler_Multipart_POST(t *testing.T) { } func TestHandler_Multipart_PUT(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -992,13 +992,13 @@ func TestHandler_Multipart_PUT(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8020", Handler: h} @@ -1094,10 +1094,10 @@ func TestHandler_Multipart_PUT(t *testing.T) { } func TestHandler_Multipart_PATCH(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1106,13 +1106,13 @@ func TestHandler_Multipart_PATCH(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8021", Handler: h} @@ -1210,10 +1210,10 @@ func TestHandler_Multipart_PATCH(t *testing.T) { } func TestHandler_Error(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1222,13 +1222,13 @@ func TestHandler_Error(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1256,10 +1256,10 @@ func TestHandler_Error(t *testing.T) { } func TestHandler_Error2(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error2", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1268,13 +1268,13 @@ func TestHandler_Error2(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1302,10 +1302,10 @@ func TestHandler_Error2(t *testing.T) { } func TestHandler_Error3(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1314,13 +1314,13 @@ func TestHandler_Error3(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1361,10 +1361,10 @@ func TestHandler_Error3(t *testing.T) { } func TestHandler_ResponseDuration(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1373,13 +1373,13 @@ func TestHandler_ResponseDuration(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1422,10 +1422,10 @@ func TestHandler_ResponseDuration(t *testing.T) { } func TestHandler_ResponseDurationDelayed(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echoDelay", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1434,13 +1434,13 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1482,10 +1482,10 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { } func TestHandler_ErrorDuration(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1494,13 +1494,13 @@ func TestHandler_ErrorDuration(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1556,10 +1556,10 @@ func TestHandler_IP(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1568,13 +1568,13 @@ func TestHandler_IP(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, cidrs, pool) + }, cidrs, p) assert.NoError(t, err) hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} @@ -1617,10 +1617,10 @@ func TestHandler_XRealIP(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1629,13 +1629,13 @@ func TestHandler_XRealIP(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, cidrs, pool) + }, cidrs, p) assert.NoError(t, err) hs := &http.Server{Addr: "127.0.0.1:8179", Handler: h} @@ -1683,10 +1683,10 @@ func TestHandler_XForwardedFor(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1695,13 +1695,13 @@ func TestHandler_XForwardedFor(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, cidrs, pool) + }, cidrs, p) assert.NoError(t, err) hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} @@ -1748,10 +1748,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1760,13 +1760,13 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, cidrs, pool) + }, cidrs, p) assert.NoError(t, err) hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} @@ -1796,10 +1796,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { } func BenchmarkHandler_Listen_Echo(b *testing.B) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1808,13 +1808,13 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { b.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(b, err) hs := &http.Server{Addr: ":8177", Handler: h} diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index c530f7d1..26b28165 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -19,7 +19,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/logger" diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index e03638d2..dd986902 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -16,8 +16,8 @@ import ( "time" j "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/pipe" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/stretchr/testify/assert" diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go index ba281d02..2e5af988 100644 --- a/tests/plugins/informer/test_plugin.go +++ b/tests/plugins/informer/test_plugin.go @@ -4,18 +4,18 @@ import ( "context" "time" - "github.com/spiral/roadrunner/v2/interfaces/worker" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/server" ) -var testPoolConfig = poolImpl.Config{ +var testPoolConfig = pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: &poolImpl.SupervisorConfig{ + Supervisor: &pool.SupervisorConfig{ WatchTick: 60, TTL: 1000, IdleTTL: 10, @@ -50,10 +50,16 @@ func (p1 *Plugin1) Name() string { } func (p1 *Plugin1) Workers() []worker.BaseProcess { - pool, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) + p, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) if err != nil { panic(err) } - return pool.Workers() + workers := p.Workers() + baseWorkers := make([]worker.BaseProcess, 0, len(workers)) + for i := 0; i < len(workers); i++ { + baseWorkers = append(baseWorkers, worker.FromSync(workers[i])) + } + + return baseWorkers } diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go index 5eb2fed1..f53b228f 100644 --- a/tests/plugins/server/plugin_pipes.go +++ b/tests/plugins/server/plugin_pipes.go @@ -5,8 +5,8 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/pool" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" @@ -80,11 +80,7 @@ func (f *Foo) Serve() chan error { } // test that our worker is functional - sw, err := worker.From(w) - if err != nil { - errCh <- err - return errCh - } + sw := worker.From(w) rsp, err := sw.Exec(r) if err != nil { diff --git a/tests/plugins/server/plugin_sockets.go b/tests/plugins/server/plugin_sockets.go index ede67ded..0b2857e3 100644 --- a/tests/plugins/server/plugin_sockets.go +++ b/tests/plugins/server/plugin_sockets.go @@ -4,8 +4,8 @@ import ( "context" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/server" @@ -60,11 +60,7 @@ func (f *Foo2) Serve() chan error { } // test that our worker is functional - sw, err := worker.From(w) - if err != nil { - errCh <- err - return errCh - } + sw := worker.From(w) rsp, err := sw.Exec(r) if err != nil { diff --git a/tests/plugins/server/plugin_tcp.go b/tests/plugins/server/plugin_tcp.go index 98c13b2b..ef4cea39 100644 --- a/tests/plugins/server/plugin_tcp.go +++ b/tests/plugins/server/plugin_tcp.go @@ -4,8 +4,8 @@ import ( "context" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/server" @@ -60,11 +60,7 @@ func (f *Foo3) Serve() chan error { } // test that our worker is functional - sw, err := worker.From(w) - if err != nil { - errCh <- err - return errCh - } + sw := worker.From(w) rsp, err := sw.Exec(r) if err != nil { diff --git a/tools/process.go b/tools/process.go index 50fe1616..a01f2b24 100644 --- a/tools/process.go +++ b/tools/process.go @@ -3,7 +3,7 @@ package tools import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" ) // ProcessState provides information about specific worker. -- cgit v1.2.3