diff options
author | Valery Piashchynski <[email protected]> | 2020-12-17 12:13:55 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-12-17 12:13:55 +0300 |
commit | ee0cb478c74c393a35155c2bf51e1ef260e0e5e2 (patch) | |
tree | 2c99d4c6e2b2e9e3fa155d5d68a9d471c9aeeb9b | |
parent | a1dc59cabb6e63eab232922f4eb5a19dbd168f44 (diff) | |
parent | edf924b37bcdad14eb31014c571ab58720aa178f (diff) |
Merge pull request #452 from spiral/refactor/splitv2.0.0-alpha23
Refactor/split
57 files changed, 1850 insertions, 1657 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f69d672a..04545212 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -14,7 +14,7 @@ jobs: golang: name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}}) runs-on: ${{ matrix.os }} - timeout-minutes: 20 + timeout-minutes: 25 strategy: fail-fast: false matrix: @@ -65,7 +65,11 @@ jobs: - name: Run golang tests on Windows without codecov if: ${{ matrix.os == 'windows-latest' }} run: | - go test -v -race -cover -tags=debug . + go test -v -race -cover -tags=debug ./pkg/pipe + go test -v -race -cover -tags=debug ./pkg/pool + go test -v -race -cover -tags=debug ./pkg/socket + go test -v -race -cover -tags=debug ./pkg/worker + go test -v -race -cover -tags=debug ./pkg/worker_watcher go test -v -race -cover -tags=debug ./plugins/rpc go test -v -race -cover -tags=debug ./plugins/rpc/tests go test -v -race -cover -tags=debug ./plugins/config/tests @@ -86,7 +90,11 @@ jobs: if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} run: | mkdir ./coverage-ci - go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/lib.txt -covermode=atomic . + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/socket + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/worker_watcher.txt -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/rpc_config.txt -covermode=atomic ./plugins/rpc go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./plugins/rpc/tests go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/plugin_config.txt -covermode=atomic ./plugins/config/tests @@ -24,7 +24,11 @@ uninstall: ## Uninstall locally installed RR rm -f /usr/local/bin/rr test: ## Run application tests - go test -v -race -cover -tags=debug -covermode=atomic . + go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pipe + go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pool + go test -v -race -cover -tags=debug -covermode=atomic ./pkg/socket + go test -v -race -cover -tags=debug -covermode=atomic ./pkg/worker + go test -v -race -cover -tags=debug -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc/tests go test -v -race -cover -tags=debug -covermode=atomic ./plugins/config/tests @@ -384,6 +384,7 @@ github.com/spiral/errors v1.0.6 h1:berk5ShEILSw6DplUVv9Ea1wGdk2WlVKQpuvDngll0U= github.com/spiral/errors v1.0.6/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/goridge/v3 v3.0.0-beta7 h1:rJmfVFC/clN7XgsONcu185l36cPJ+MfcFkQSifQXFCM= github.com/spiral/goridge/v3 v3.0.0-beta7/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= +github.com/spiral/roadrunner v1.9.0 h1:hQRAqrpUCOujuuuY4dV5hQWjMhwvMnVZmK2mNON/yl4= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= diff --git a/interfaces/events/handler.go b/interfaces/events/handler.go new file mode 100644 index 00000000..01f64d73 --- /dev/null +++ b/interfaces/events/handler.go @@ -0,0 +1,10 @@ +package events + +type Handler interface { + NumListeners() int + AddListener(listener EventListener) + Push(e interface{}) +} + +// Event listener listens for the events produced by worker, worker pool or other service. +type EventListener func(event interface{}) diff --git a/interfaces/events/pool_events.go b/interfaces/events/pool_events.go new file mode 100644 index 00000000..cc32f6b2 --- /dev/null +++ b/interfaces/events/pool_events.go @@ -0,0 +1,65 @@ +package events + +const ( + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct P = iota + 7800 + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventPoolError caused on pool wide errors. + EventPoolError + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // todo: EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError + EventTTL + + // todo: EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL +) + +type P int64 + +func (ev P) String() string { + switch ev { + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventPoolError: + return "EventPoolError" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + } + return "Unknown event type" +} + +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event P + + // Payload depends on event type, typically it's worker or error. + Payload interface{} +} diff --git a/interfaces/events/worker_events.go b/interfaces/events/worker_events.go new file mode 100644 index 00000000..497f0a06 --- /dev/null +++ b/interfaces/events/worker_events.go @@ -0,0 +1,34 @@ +package events + +// EventWorkerKill thrown after WorkerProcess is being forcefully killed. +const ( + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError E = iota + 200 + + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog +) + +type E int64 + +func (ev E) String() string { + switch ev { + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + } + return "Unknown event type" +} + +// WorkerEvent wraps worker events. +type WorkerEvent struct { + // Event id, see below. + Event E + + // Worker triggered the event. + Worker interface{} + + // Event specific payload. + Payload interface{} +} diff --git a/interfaces/factory/factory.go b/interfaces/factory/factory.go new file mode 100755 index 00000000..51b73501 --- /dev/null +++ b/interfaces/factory/factory.go @@ -0,0 +1,22 @@ +package worker + +import ( + "context" + "os/exec" + + "github.com/spiral/roadrunner/v2/interfaces/worker" +) + +// Factory is responsible of wrapping given command into tasks WorkerProcess. +type Factory interface { + // SpawnWorkerWithContext creates new WorkerProcess process based on given command with contex. + // Process must not be started. + SpawnWorkerWithContext(context.Context, *exec.Cmd) (worker.BaseProcess, error) + + // SpawnWorker creates new WorkerProcess process based on given command. + // Process must not be started. + SpawnWorker(*exec.Cmd) (worker.BaseProcess, error) + + // Close the factory and underlying connections. + Close(ctx context.Context) error +} diff --git a/interfaces/informer/interface.go b/interfaces/informer/interface.go index a8d32841..b975edd7 100644 --- a/interfaces/informer/interface.go +++ b/interfaces/informer/interface.go @@ -1,8 +1,10 @@ package informer -import "github.com/spiral/roadrunner/v2" +import ( + "github.com/spiral/roadrunner/v2/interfaces/worker" +) // Informer used to get workers from particular plugin or set of plugins type Informer interface { - Workers() []roadrunner.WorkerBase + Workers() []worker.BaseProcess } diff --git a/pool.go b/interfaces/pool/pool.go index 3e38c4cb..a1015fd6 100755..100644 --- a/pool.go +++ b/interfaces/pool/pool.go @@ -1,76 +1,40 @@ -package roadrunner +package pool import ( "context" "runtime" "time" - "github.com/spiral/roadrunner/v2/util" -) - -// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. -type PoolEvent struct { - // Event type, see below. - Event int64 - - // Payload depends on event type, typically it's worker or error. - Payload interface{} -} - -const ( - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct = iota + 7800 - - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - - // EventPoolError caused on pool wide errors. - EventPoolError - - // EventSupervisorError triggered when supervisor can not complete work. - EventSupervisorError - - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - - // todo: EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - - // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError - EventTTL - - // todo: EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" ) // Pool managed set of inner worker processes. type Pool interface { // AddListener connects event listener to the pool. - AddListener(listener util.EventListener) + AddListener(listener events.EventListener) // GetConfig returns pool configuration. - GetConfig() PoolConfig + GetConfig() interface{} // Exec - Exec(rqs Payload) (Payload, error) + Exec(rqs internal.Payload) (internal.Payload, error) - ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) + ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) // Workers returns worker list associated with the pool. - Workers() (workers []WorkerBase) + Workers() (workers []worker.BaseProcess) // Remove worker from the pool. - RemoveWorker(worker WorkerBase) error + RemoveWorker(worker worker.BaseProcess) error // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) } // Configures the pool behaviour. -type PoolConfig struct { +type Config struct { // Debug flag creates new fresh worker before every request. Debug bool @@ -96,7 +60,7 @@ type PoolConfig struct { } // InitDefaults enables default config values. -func (cfg *PoolConfig) InitDefaults() { +func (cfg *Config) InitDefaults() { if cfg.NumWorkers == 0 { cfg.NumWorkers = int64(runtime.NumCPU()) } diff --git a/interfaces/server/interface.go b/interfaces/server/interface.go index 2dae30c5..c50848d7 100644 --- a/interfaces/server/interface.go +++ b/interfaces/server/interface.go @@ -4,7 +4,9 @@ import ( "context" "os/exec" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" ) type Env map[string]string @@ -12,6 +14,6 @@ type Env map[string]string // Server creates workers for the application. type Server interface { CmdFactory(env Env) (func() *exec.Cmd, error) - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env Env) (roadrunner.Pool, error) + NewWorker(ctx context.Context, env Env) (worker.BaseProcess, error) + NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env) (pool.Pool, error) } diff --git a/factory.go b/interfaces/worker/factory.go index 482d39f8..19e2bf5d 100755..100644 --- a/factory.go +++ b/interfaces/worker/factory.go @@ -1,4 +1,4 @@ -package roadrunner +package worker import ( "context" @@ -7,12 +7,12 @@ import ( // Factory is responsible of wrapping given command into tasks WorkerProcess. type Factory interface { + // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. + // Process must not be started. + SpawnWorkerWithContext(context.Context, *exec.Cmd) (BaseProcess, error) // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. - SpawnWorkerWithContext(context.Context, *exec.Cmd) (WorkerBase, error) - - SpawnWorker(*exec.Cmd) (WorkerBase, error) - + SpawnWorker(*exec.Cmd) (BaseProcess, error) // Close the factory and underlying connections. Close(ctx context.Context) error } diff --git a/interfaces/worker/watcher.go b/interfaces/worker/watcher.go new file mode 100644 index 00000000..ce2c1c5a --- /dev/null +++ b/interfaces/worker/watcher.go @@ -0,0 +1,26 @@ +package worker + +import "context" + +type Watcher interface { + // AddToWatch used to add stack to wait its state + AddToWatch(workers []BaseProcess) error + + // GetFreeWorker provide first free worker + GetFreeWorker(ctx context.Context) (BaseProcess, error) + + // PutWorker enqueues worker back + PushWorker(w BaseProcess) + + // AllocateNew used to allocate new worker and put in into the WorkerWatcher + AllocateNew() error + + // Destroy destroys the underlying stack + Destroy(ctx context.Context) + + // WorkersList return all stack w/o removing it from internal storage + WorkersList() []BaseProcess + + // RemoveWorker remove worker from the stack + RemoveWorker(wb BaseProcess) error +} diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go new file mode 100644 index 00000000..edbc68d9 --- /dev/null +++ b/interfaces/worker/worker.go @@ -0,0 +1,62 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/spiral/goridge/v3" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/internal" +) + +// Allocator is responsible for worker allocation in the pool +type Allocator func() (BaseProcess, error) + +type BaseProcess interface { + fmt.Stringer + + // Pid returns worker pid. + Pid() int64 + + // Created returns time worker was created at. + Created() time.Time + + // AddListener attaches listener to consume worker events. + AddListener(listener events.EventListener) + + // State return receive-only WorkerProcess state object, state can be used to safely access + // WorkerProcess status, time when status changed and number of WorkerProcess executions. + State() internal.State + + // Start used to run Cmd and immediately return + Start() error + + // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is + // complete and will return process error (if any), if stderr is presented it's value + // will be wrapped as WorkerError. Method will return error code if php process fails + // to find or Start the script. + Wait() error + + // Stop sends soft termination command to the WorkerProcess and waits for process completion. + Stop(ctx context.Context) error + + // Kill kills underlying process, make sure to call Wait() func to gather + // error log from the stderr. Does not waits for process completion! + Kill() error + + // Relay returns attached to worker goridge relay + Relay() goridge.Relay + + // AttachRelay used to attach goridge relay to the worker process + AttachRelay(rl goridge.Relay) +} + +type SyncWorker interface { + // BaseProcess provides basic functionality for the SyncWorker + BaseProcess + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS + Exec(rqs internal.Payload) (internal.Payload, error) + // ExecWithContext used to handle Exec with TTL + ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) +} diff --git a/payload.go b/internal/payload.go index 502dfadd..63983bad 100755 --- a/payload.go +++ b/internal/payload.go @@ -1,4 +1,4 @@ -package roadrunner +package internal // Payload carries binary header and body to stack and // back to the server. diff --git a/protocol.go b/internal/protocol.go index ee2d8245..5aa681eb 100755 --- a/protocol.go +++ b/internal/protocol.go @@ -1,4 +1,4 @@ -package roadrunner +package internal import ( "os" @@ -10,7 +10,7 @@ import ( var json = j.ConfigCompatibleWithStandardLibrary -type stopCommand struct { +type StopCommand struct { Stop bool `json:"stop"` } @@ -18,7 +18,7 @@ type pidCommand struct { Pid int `json:"pid"` } -func sendControl(rl goridge.Relay, v interface{}) error { +func SendControl(rl goridge.Relay, v interface{}) error { const op = errors.Op("send control frame") frame := goridge.NewFrame() frame.WriteVersion(goridge.VERSION_1) @@ -58,9 +58,9 @@ func sendControl(rl goridge.Relay, v interface{}) error { return nil } -func fetchPID(rl goridge.Relay) (int64, error) { +func FetchPID(rl goridge.Relay) (int64, error) { const op = errors.Op("fetchPID") - err := sendControl(rl, pidCommand{Pid: os.Getpid()}) + err := SendControl(rl, pidCommand{Pid: os.Getpid()}) if err != nil { return 0, errors.E(op, err) } diff --git a/state.go b/internal/state.go index 45e90f96..8f7d939b 100755 --- a/state.go +++ b/internal/state.go @@ -1,4 +1,4 @@ -package roadrunner +package internal import ( "fmt" @@ -8,10 +8,9 @@ import ( // State represents WorkerProcess status and updated time. type State interface { fmt.Stringer - - // Value returns state value + // Value returns WorkerState value Value() int64 - // Set sets the state + // Set sets the WorkerState Set(value int64) // NumJobs shows how many times WorkerProcess was invoked NumExecs() int64 @@ -49,13 +48,13 @@ const ( // StateStopped - process has been terminated. StateStopped - // StateErrored - error state (can't be used). + // StateErrored - error WorkerState (can't be used). StateErrored StateRemove ) -type state struct { +type WorkerState struct { value int64 numExecs int64 // to be lightweight, use UnixNano @@ -63,12 +62,12 @@ type state struct { } // Thread safe -func newState(value int64) *state { - return &state{value: value} +func NewWorkerState(value int64) *WorkerState { + return &WorkerState{value: value} } -// String returns current state as string. -func (s *state) String() string { +// String returns current WorkerState as string. +func (s *WorkerState) String() string { switch s.Value() { case StateInactive: return "inactive" @@ -88,36 +87,36 @@ func (s *state) String() string { } // NumExecs returns number of registered WorkerProcess execs. -func (s *state) NumExecs() int64 { +func (s *WorkerState) NumExecs() int64 { return atomic.LoadInt64(&s.numExecs) } -// Value state returns state value -func (s *state) Value() int64 { +// Value WorkerState returns WorkerState value +func (s *WorkerState) Value() int64 { return atomic.LoadInt64(&s.value) } // IsActive returns true if WorkerProcess not Inactive or Stopped -func (s *state) IsActive() bool { +func (s *WorkerState) IsActive() bool { val := s.Value() return val == StateWorking || val == StateReady } -// change state value (status) -func (s *state) Set(value int64) { +// change WorkerState value (status) +func (s *WorkerState) Set(value int64) { atomic.StoreInt64(&s.value, value) } // register new execution atomically -func (s *state) RegisterExec() { +func (s *WorkerState) RegisterExec() { atomic.AddInt64(&s.numExecs, 1) } // Update last used time -func (s *state) SetLastUsed(lu uint64) { +func (s *WorkerState) SetLastUsed(lu uint64) { atomic.StoreUint64(&s.lastUsed, lu) } -func (s *state) LastUsed() uint64 { +func (s *WorkerState) LastUsed() uint64 { return atomic.LoadUint64(&s.lastUsed) } diff --git a/internal/state_test.go b/internal/state_test.go new file mode 100755 index 00000000..bdb05825 --- /dev/null +++ b/internal/state_test.go @@ -0,0 +1,27 @@ +package internal + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_NewState(t *testing.T) { + st := NewWorkerState(StateErrored) + + assert.Equal(t, "errored", st.String()) + + assert.Equal(t, "inactive", NewWorkerState(StateInactive).String()) + assert.Equal(t, "ready", NewWorkerState(StateReady).String()) + assert.Equal(t, "working", NewWorkerState(StateWorking).String()) + assert.Equal(t, "stopped", NewWorkerState(StateStopped).String()) + assert.Equal(t, "undefined", NewWorkerState(1000).String()) +} + +func Test_IsActive(t *testing.T) { + assert.False(t, NewWorkerState(StateInactive).IsActive()) + assert.True(t, NewWorkerState(StateReady).IsActive()) + assert.True(t, NewWorkerState(StateWorking).IsActive()) + assert.False(t, NewWorkerState(StateStopped).IsActive()) + assert.False(t, NewWorkerState(StateErrored).IsActive()) +} diff --git a/pipe_factory_test.go b/pipe_factory_test.go deleted file mode 100755 index c742f522..00000000 --- a/pipe_factory_test.go +++ /dev/null @@ -1,238 +0,0 @@ -package roadrunner - -import ( - "context" - "os/exec" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func Test_Pipe_Start(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - assert.NoError(t, w.Stop(ctx)) -} - -func Test_Pipe_StartError(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - err := cmd.Start() - if err != nil { - t.Errorf("error running the command: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError2(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Failboot(t *testing.T) { - cmd := exec.Command("php", "tests/failboot.php") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") -} - -func Test_Pipe_Invalid(t *testing.T) { - cmd := exec.Command("php", "tests/invalid.php") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Echo(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - err = w.Stop(ctx) - if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) - } - }() - - sw, err := NewSyncWorker(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Pipe_Broken(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "broken", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - time.Sleep(time.Second) - err = w.Stop(ctx) - assert.Error(t, err) - }() - - sw, err := NewSyncWorker(w) - if err != nil { - t.Fatal(err) - } - - res, err := sw.Exec(Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) -} - -func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { - f := NewPipeFactory() - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorkerWithContext(context.Background(), cmd) - go func() { - if w.Wait() != nil { - b.Fail() - } - }() - - err := w.Stop(context.Background()) - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd) - sw, err := NewSyncWorker(w) - if err != nil { - b.Fatal(err) - } - b.ReportAllocs() - b.ResetTimer() - go func() { - err := w.Wait() - if err != nil { - b.Errorf("error waiting the worker: error %v", err) - } - }() - defer func() { - err := w.Stop(context.Background()) - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - }() - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop(ctx) - if err != nil { - b.Errorf("error stopping the WorkerProcess: error %v", err) - } - }() - - sw, err := NewSyncWorker(w) - if err != nil { - b.Fatal(err) - } - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop(ctx) - if err != nil { - b.Errorf("error stopping the WorkerProcess: error %v", err) - } - }() - - sw, err := NewSyncWorker(w) - if err != nil { - b.Fatal(err) - } - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100755 index 00000000..92dc103a --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,38 @@ +package events + +import ( + "sync" + + "github.com/spiral/roadrunner/v2/interfaces/events" +) + +// EventHandler helps to broadcast events to multiple listeners. +type EventHandler struct { + listeners []events.EventListener + sync.RWMutex +} + +func NewEventsHandler() events.Handler { + return &EventHandler{listeners: make([]events.EventListener, 0, 2)} +} + +// NumListeners returns number of event listeners. +func (eb *EventHandler) NumListeners() int { + return len(eb.listeners) +} + +// AddListener registers new event listener. +func (eb *EventHandler) AddListener(listener events.EventListener) { + eb.Lock() + defer eb.Unlock() + eb.listeners = append(eb.listeners, listener) +} + +// Push broadcast events across all event listeners. +func (eb *EventHandler) Push(e interface{}) { + eb.Lock() + defer eb.Unlock() + for k := range eb.listeners { + eb.listeners[k](e) + } +} diff --git a/pipe_factory.go b/pkg/pipe/pipe_factory.go index db00c989..c86d78c4 100755 --- a/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -1,4 +1,4 @@ -package roadrunner +package pipe import ( "context" @@ -6,33 +6,36 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" + workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" ) -// PipeFactory connects to stack using standard +// Factory connects to stack using standard // streams (STDIN, STDOUT pipes). -type PipeFactory struct{} +type Factory struct{} // NewPipeFactory returns new factory instance and starts // listening // todo: review tests -func NewPipeFactory() Factory { - return &PipeFactory{} +func NewPipeFactory() worker.Factory { + return &Factory{} } type SpawnResult struct { - w WorkerBase + w worker.BaseProcess err error } -// SpawnWorker creates new WorkerProcess and connects it to goridge relay, +// SpawnWorker creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) { +func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { c := make(chan SpawnResult) const op = errors.Op("spawn worker with context") go func() { - w, err := InitBaseWorker(cmd) + w, err := workerImpl.InitBaseWorker(cmd) if err != nil { c <- SpawnResult{ w: nil, @@ -76,7 +79,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) } // errors bundle - pid, err := fetchPID(relay) + pid, err := internal.FetchPID(relay) if pid != w.Pid() || err != nil { err = multierr.Combine( err, @@ -91,7 +94,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) } // everything ok, set ready state - w.State().Set(StateReady) + w.State().Set(internal.StateReady) // return worker c <- SpawnResult{ @@ -111,9 +114,9 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) } } -func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { const op = errors.Op("spawn worker") - w, err := InitBaseWorker(cmd) + w, err := workerImpl.InitBaseWorker(cmd) if err != nil { return nil, errors.E(op, err) } @@ -141,7 +144,7 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { } // errors bundle - if pid, err := fetchPID(relay); pid != w.Pid() { + if pid, err := internal.FetchPID(relay); pid != w.Pid() { err = multierr.Combine( err, w.Kill(), @@ -151,11 +154,11 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { } // everything ok, set ready state - w.State().Set(StateReady) + w.State().Set(internal.StateReady) return w, nil } // Close the factory. -func (f *PipeFactory) Close(ctx context.Context) error { +func (f *Factory) Close(ctx context.Context) error { return nil } diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go new file mode 100755 index 00000000..99212ff8 --- /dev/null +++ b/pkg/pipe/pipe_factory_test.go @@ -0,0 +1,511 @@ +package pipe + +import ( + "context" + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/internal" + workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func Test_GetState(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, internal.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, internal.StateReady, w.State().Value()) + err = w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Kill(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + // TODO changed from stopped, discuss + assert.Equal(t, internal.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, internal.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop(ctx)) +} + +func Test_Pipe_StartError(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot(t *testing.T) { + cmd := exec.Command("php", "../../tests/failboot.php") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Pipe_Invalid(t *testing.T) { + cmd := exec.Command("php", "../../tests/invalid.php") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw, err := workerImpl.From(w) + if err != nil { + t.Fatal(err) + } + + res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + time.Sleep(time.Second) + err = w.Stop(ctx) + assert.Error(t, err) + }() + + sw, err := workerImpl.From(w) + if err != nil { + t.Fatal(err) + } + + res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) +} + +func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { + f := NewPipeFactory() + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorkerWithContext(context.Background(), cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop(context.Background()) + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd) + sw, err := workerImpl.From(w) + if err != nil { + b.Fatal(err) + } + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop(context.Background()) + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop(ctx) + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw, err := workerImpl.From(w) + if err != nil { + b.Fatal(err) + } + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop(ctx) + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw, err := workerImpl.From(w) + if err != nil { + b.Fatal(err) + } + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + syncWorker, err := workerImpl.From(w) + if err != nil { + t.Fatal(err) + } + go func() { + assert.NoError(t, syncWorker.Wait()) + }() + defer func() { + err := syncWorker.Stop(ctx) + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + + syncWorker, err := workerImpl.From(w) + if err != nil { + t.Fatal(err) + } + + go func() { + assert.NoError(t, syncWorker.Wait()) + }() + defer func() { + err := syncWorker.Stop(ctx) + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := syncWorker.Exec(internal.Payload{}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_Echo_Slow(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + syncWorker, err := workerImpl.From(w) + if err != nil { + t.Fatal(err) + } + + res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + data := "" + mu := &sync.Mutex{} + w.AddListener(func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + mu.Lock() + data = string(wev.Payload.([]byte)) + mu.Unlock() + } + }) + + syncWorker, err := workerImpl.From(w) + if err != nil { + t.Fatal(err) + } + + res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + time.Sleep(time.Second * 3) + mu.Lock() + if strings.ContainsAny(data, "undefined_function()") == false { + t.Fail() + } + mu.Unlock() + assert.Error(t, w.Stop(ctx)) +} + +func Test_Error(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + syncWorker, err := workerImpl.From(w) + if err != nil { + t.Fatal(err) + } + + res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + if errors.Is(errors.ErrSoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello") +} + +func Test_NumExecs(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + syncWorker, err := workerImpl.From(w) + if err != nil { + t.Fatal(err) + } + + _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(1), w.State().NumExecs()) + + _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(2), w.State().NumExecs()) + + _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(3), w.State().NumExecs()) +} diff --git a/pkg/pool/config.go b/pkg/pool/config.go new file mode 100644 index 00000000..3dcc3584 --- /dev/null +++ b/pkg/pool/config.go @@ -0,0 +1,75 @@ +package pool + +import ( + "runtime" + "time" +) + +// Configures the pool behaviour. +type Config struct { + // Debug flag creates new fresh worker before every request. + Debug bool + + // NumWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. + NumWorkers int64 + + // MaxJobs defines how many executions is allowed for the worker until + // it's destruction. set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. + MaxJobs int64 + + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. Defaults to 60s. + AllocateTimeout time.Duration + + // DestroyTimeout defines for how long pool should be waiting for worker to + // properly destroy, if timeout reached worker will be killed. Defaults to 60s. + DestroyTimeout time.Duration + + // Supervision config to limit worker and pool memory usage. + Supervisor *SupervisorConfig +} + +// InitDefaults enables default config values. +func (cfg *Config) InitDefaults() { + if cfg.NumWorkers == 0 { + cfg.NumWorkers = int64(runtime.NumCPU()) + } + + if cfg.AllocateTimeout == 0 { + cfg.AllocateTimeout = time.Minute + } + + if cfg.DestroyTimeout == 0 { + cfg.DestroyTimeout = time.Minute + } + if cfg.Supervisor == nil { + return + } + cfg.Supervisor.InitDefaults() +} + +type SupervisorConfig struct { + // WatchTick defines how often to check the state of worker. + WatchTick uint64 + + // TTL defines maximum time worker is allowed to live. + TTL uint64 + + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. + IdleTTL uint64 + + // ExecTTL defines maximum lifetime per job. + ExecTTL uint64 + + // MaxWorkerMemory limits memory per worker. + MaxWorkerMemory uint64 +} + +// InitDefaults enables default config values. +func (cfg *SupervisorConfig) InitDefaults() { + if cfg.WatchTick == 0 { + cfg.WatchTick = 1 + } +} diff --git a/static_pool.go b/pkg/pool/static_pool.go index fbb2e5e8..6cc42143 100755 --- a/static_pool.go +++ b/pkg/pool/static_pool.go @@ -1,11 +1,17 @@ -package roadrunner +package pool import ( "context" "os/exec" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/util" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" + events2 "github.com/spiral/roadrunner/v2/pkg/events" + syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" + workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" ) // StopRequest can be sent by worker to indicate that restart is required. @@ -13,38 +19,35 @@ const StopRequest = "{\"stop\":true}" var bCtx = context.Background() -// Allocator is responsible for worker allocation in the pool -type Allocator func() (WorkerBase, error) - // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w WorkerBase) (Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error) -// PoolBefore is set of functions that executes BEFORE Exec -type Before func(req Payload) Payload +// Before is set of functions that executes BEFORE Exec +type Before func(req internal.Payload) internal.Payload -// PoolAfter is set of functions that executes AFTER Exec -type After func(req Payload, resp Payload) Payload +// After is set of functions that executes AFTER Exec +type After func(req internal.Payload, resp internal.Payload) internal.Payload -type PoolOptions func(p *StaticPool) +type Options func(p *StaticPool) // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - cfg PoolConfig + cfg Config // worker command creator cmd func() *exec.Cmd // creates and connects to stack - factory Factory + factory worker.Factory // distributes the events - events util.EventsHandler + events events.Handler // manages worker states and TTLs - ww WorkerWatcher + ww worker.Watcher // allocate new worker - allocator Allocator + allocator worker.Allocator errEncoder ErrorEncoder before []Before @@ -52,7 +55,7 @@ type StaticPool struct { } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg PoolConfig, options ...PoolOptions) (Pool, error) { +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) { const op = errors.Op("NewPool") if factory == nil { return nil, errors.E(op, errors.Str("no factory initialized")) @@ -68,13 +71,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Poo cfg: cfg, cmd: cmd, factory: factory, - events: util.NewEventsHandler(), + events: events2.NewEventsHandler(), after: make([]After, 0, 0), before: make([]Before, 0, 0), } p.allocator = newPoolAllocator(factory, cmd) - p.ww = newWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) if err != nil { @@ -105,38 +108,38 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Poo return p, nil } -func PoolBefore(before ...Before) PoolOptions { +func ExecBefore(before ...Before) Options { return func(p *StaticPool) { p.before = append(p.before, before...) } } -func PoolAfter(after ...After) PoolOptions { +func ExecAfter(after ...After) Options { return func(p *StaticPool) { p.after = append(p.after, after...) } } // AddListener connects event listener to the pool. -func (sp *StaticPool) AddListener(listener util.EventListener) { +func (sp *StaticPool) AddListener(listener events.EventListener) { sp.events.AddListener(listener) } -// PoolConfig returns associated pool configuration. Immutable. -func (sp *StaticPool) GetConfig() PoolConfig { +// Config returns associated pool configuration. Immutable. +func (sp *StaticPool) GetConfig() interface{} { return sp.cfg } // Workers returns worker list associated with the pool. -func (sp *StaticPool) Workers() (workers []WorkerBase) { +func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { return sp.ww.WorkersList() } -func (sp *StaticPool) RemoveWorker(wb WorkerBase) error { +func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { return sp.ww.RemoveWorker(wb) } -func (sp *StaticPool) Exec(p Payload) (Payload, error) { +func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { const op = errors.Op("exec") if sp.cfg.Debug { return sp.execDebug(p) @@ -145,10 +148,10 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { defer cancel() w, err := sp.getWorker(ctxGetFree, op) if err != nil { - return EmptyPayload, errors.E(op, err) + return internal.Payload{}, errors.E(op, err) } - sw := w.(SyncWorker) + sw := w.(worker.SyncWorker) if len(sp.before) > 0 { for i := 0; i < len(sp.before); i++ { @@ -164,10 +167,10 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { // worker want's to be terminated // TODO careful with string(rsp.Context) if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { - sw.State().Set(StateInvalid) + sw.State().Set(internal.StateInvalid) err = sw.Stop(bCtx) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } return sp.Exec(p) @@ -176,7 +179,7 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - return EmptyPayload, errors.E(op, err) + return internal.Payload{}, errors.E(op, err) } } else { sp.ww.PushWorker(sw) @@ -191,16 +194,16 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { return rsp, nil } -func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) { const op = errors.Op("exec with context") ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() w, err := sp.getWorker(ctxGetFree, op) if err != nil { - return EmptyPayload, errors.E(op, err) + return internal.Payload{}, errors.E(op, err) } - sw := w.(SyncWorker) + sw := w.(worker.SyncWorker) // apply all before function if len(sp.before) > 0 { @@ -216,10 +219,10 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - sw.State().Set(StateInvalid) + sw.State().Set(internal.StateInvalid) err = sw.Stop(bCtx) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } return sp.Exec(rqs) @@ -228,7 +231,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - return EmptyPayload, errors.E(op, err) + return internal.Payload{}, errors.E(op, err) } } else { sp.ww.PushWorker(sw) @@ -244,13 +247,13 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload return rsp, nil } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (WorkerBase, error) { +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { // GetFreeWorker function consumes context with timeout w, err := sp.ww.GetFreeWorker(ctxGetFree) if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Push(PoolEvent{Event: EventNoFreeWorkers, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)}) return nil, errors.E(op, err) } // else if err not nil - return error @@ -265,48 +268,48 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w WorkerBase) (Payload, error) { + return func(err error, w worker.BaseProcess) (internal.Payload, error) { const op = errors.Op("error encoder") // soft job errors are allowed if errors.Is(errors.ErrSoftJob, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - sp.events.Push(PoolEvent{Event: EventWorkerConstruct, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) } - w.State().Set(StateInvalid) + w.State().Set(internal.StateInvalid) err = w.Stop(bCtx) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } else { sp.ww.PushWorker(w) } - return EmptyPayload, errors.E(op, err) + return internal.Payload{}, errors.E(op, err) } - w.State().Set(StateInvalid) - sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) + w.State().Set(internal.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) if errS != nil { - return EmptyPayload, errors.E(op, errors.Errorf("%v, %v", err, errS)) + return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) } - return EmptyPayload, errors.E(op, err) + return internal.Payload{}, errors.E(op, err) } } -func newPoolAllocator(factory Factory, cmd func() *exec.Cmd) Allocator { - return func() (WorkerBase, error) { +func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator { + return func() (worker.BaseProcess, error) { w, err := factory.SpawnWorkerWithContext(bCtx, cmd()) if err != nil { return nil, err } - sw, err := NewSyncWorker(w) + sw, err := syncWorker.From(w) if err != nil { return nil, err } @@ -314,25 +317,25 @@ func newPoolAllocator(factory Factory, cmd func() *exec.Cmd) Allocator { } } -func (sp *StaticPool) execDebug(p Payload) (Payload, error) { +func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) { sw, err := sp.allocator() if err != nil { - return EmptyPayload, err + return internal.Payload{}, err } - r, err := sw.(SyncWorker).Exec(p) + r, err := sw.(worker.SyncWorker).Exec(p) if stopErr := sw.Stop(context.Background()); stopErr != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) } return r, err } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { +func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) { const op = errors.Op("allocate workers") - var workers []WorkerBase + var workers []worker.BaseProcess // constant number of stack simplify logic for i := int64(0); i < numWorkers; i++ { diff --git a/static_pool_test.go b/pkg/pool/static_pool_test.go index 33799c40..dd33a1a6 100755 --- a/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -1,4 +1,4 @@ -package roadrunner +package pool import ( "context" @@ -12,10 +12,13 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/pipe" "github.com/stretchr/testify/assert" ) -var cfg = PoolConfig{ +var cfg = Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, @@ -25,8 +28,8 @@ func Test_NewPool(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -39,8 +42,8 @@ func Test_NewPool(t *testing.T) { func Test_StaticPool_Invalid(t *testing.T) { p, err := NewPool( context.Background(), - func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") }, + pipe.NewPipeFactory(), cfg, ) @@ -51,9 +54,9 @@ func Test_StaticPool_Invalid(t *testing.T) { func Test_ConfigNoErrorInitDefaults(t *testing.T) { p, err := NewPool( context.Background(), - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -67,8 +70,8 @@ func Test_StaticPool_Echo(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -77,7 +80,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(Payload{Body: []byte("hello")}) + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -91,8 +94,8 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -101,7 +104,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -115,8 +118,8 @@ func Test_StaticPool_Echo_Context(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") }, + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -125,7 +128,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -139,8 +142,8 @@ func Test_StaticPool_JobError(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") }, + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -148,7 +151,7 @@ func Test_StaticPool_JobError(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(Payload{Body: []byte("hello")}) + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -165,18 +168,18 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) assert.NotNil(t, p) - block := make(chan struct{}) + block := make(chan struct{}, 1) p.AddListener(func(event interface{}) { - if wev, ok := event.(WorkerEvent); ok { - if wev.Event == EventWorkerLog { + if wev, ok := event.(events.WorkerEvent); ok { + if wev.Event == events.EventWorkerLog { e := string(wev.Payload.([]byte)) if strings.ContainsAny(e, "undefined_function()") { block <- struct{}{} @@ -186,7 +189,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { } }) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.ExecWithContext(ctx, internal.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) @@ -200,8 +203,8 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -209,7 +212,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(Payload{Body: []byte("hello")}) + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -223,8 +226,8 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) p.AddListener(func(event interface{}) { - if pe, ok := event.(PoolEvent); ok { - if pe.Event == EventWorkerConstruct { + if pe, ok := event.(events.PoolEvent); ok { + if pe.Event == events.EventWorkerConstruct { wg.Done() } } @@ -240,7 +243,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { list := p.Workers() for _, w := range list { - assert.Equal(t, StateReady, w.State().Value()) + assert.Equal(t, internal.StateReady, w.State().Value()) } wg.Wait() } @@ -248,9 +251,9 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { func Test_StaticPool_AllocateTimeout(t *testing.T) { p, err := NewPool( context.Background(), - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, @@ -267,9 +270,9 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(), + Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, @@ -284,11 +287,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.Exec(Payload{Body: []byte("hello")}) + res, _ := p.Exec(internal.Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(Payload{Body: []byte("hello")}) + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -304,9 +307,9 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(), + Config{ Debug: true, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -320,14 +323,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.Len(t, p.Workers(), 0) var lastPID string - res, _ := p.Exec(Payload{Body: []byte("hello")}) + res, _ := p.Exec(internal.Payload{Body: []byte("hello")}) assert.NotEqual(t, lastPID, string(res.Body)) assert.Len(t, p.Workers(), 0) for i := 0; i < 10; i++ { assert.Len(t, p.Workers(), 0) - res, err := p.Exec(Payload{Body: []byte("hello")}) + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -344,9 +347,9 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, + pipe.NewPipeFactory(), + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -360,14 +363,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.Exec(Payload{Body: []byte("hello")}) + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(Payload{Body: []byte("hello")}) + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -384,9 +387,9 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -397,7 +400,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.Exec(Payload{Body: []byte("100")}) + _, err = p.Exec(internal.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -406,9 +409,9 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -419,7 +422,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, err := p.Exec(Payload{Body: []byte("100")}) + _, err := p.Exec(internal.Payload{Body: []byte("100")}) if err != nil { t.Errorf("error executing payload: error %v", err) } @@ -427,7 +430,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 10) p.Destroy(ctx) - _, err = p.Exec(Payload{Body: []byte("100")}) + _, err = p.Exec(internal.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -436,9 +439,9 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { ctx := context.Background() p, err := NewPool( context.Background(), - func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -450,10 +453,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { assert.NotNil(t, p) for _, w := range p.Workers() { - w.State().Set(StateErrored) + w.State().Set(internal.StateErrored) } - _, err = p.Exec(Payload{Body: []byte("hello")}) + _, err = p.Exec(internal.Payload{Body: []byte("hello")}) assert.Error(t, err) } @@ -461,9 +464,9 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { func Test_Static_Pool_Slow_Destroy(t *testing.T) { p, err := NewPool( context.Background(), - func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -480,8 +483,8 @@ func Benchmark_Pool_Echo(b *testing.B) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), cfg, ) if err != nil { @@ -491,7 +494,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -502,9 +505,9 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx := context.Background() p, _ := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, @@ -517,7 +520,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -532,9 +535,9 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx := context.Background() p, _ := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, - NewPipeFactory(), - PoolConfig{ + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, @@ -546,7 +549,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/supervisor_pool.go b/pkg/pool/supervisor_pool.go index dfec5559..6d1f0c58 100755 --- a/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -1,4 +1,4 @@ -package roadrunner +package pool import ( "context" @@ -6,27 +6,31 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/util" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" ) const MB = 1024 * 1024 -type SupervisedPool interface { - Pool +type Supervised interface { + pool.Pool // Start used to start watching process for all pool workers Start() } -type supervisedPool struct { +type supervised struct { cfg *SupervisorConfig - events util.EventsHandler - pool Pool + events events.Handler + pool pool.Pool stopCh chan struct{} mu *sync.RWMutex } -func newPoolWatcher(pool Pool, events util.EventsHandler, cfg *SupervisorConfig) SupervisedPool { - sp := &supervisedPool{ +func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised { + sp := &supervised{ cfg: cfg, events: events, pool: pool, @@ -38,10 +42,10 @@ func newPoolWatcher(pool Pool, events util.EventsHandler, cfg *SupervisorConfig) type ttlExec struct { err error - p Payload + p internal.Payload } -func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { +func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) { const op = errors.Op("exec_supervised") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) @@ -55,7 +59,7 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay if err != nil { c <- ttlExec{ err: errors.E(op, err), - p: EmptyPayload, + p: internal.Payload{}, } } @@ -68,10 +72,10 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay for { select { case <-ctx.Done(): - return EmptyPayload, errors.E(op, errors.TimeOut, ctx.Err()) + return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) case res := <-c: if res.err != nil { - return EmptyPayload, res.err + return internal.Payload{}, res.err } return res.p, nil @@ -79,38 +83,38 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay } } -func (sp *supervisedPool) Exec(p Payload) (Payload, error) { +func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) { const op = errors.Op("supervised exec") rsp, err := sp.pool.Exec(p) if err != nil { - return EmptyPayload, errors.E(op, err) + return internal.Payload{}, errors.E(op, err) } return rsp, nil } -func (sp *supervisedPool) AddListener(listener util.EventListener) { +func (sp *supervised) AddListener(listener events.EventListener) { sp.pool.AddListener(listener) } -func (sp *supervisedPool) GetConfig() PoolConfig { +func (sp *supervised) GetConfig() interface{} { return sp.pool.GetConfig() } -func (sp *supervisedPool) Workers() (workers []WorkerBase) { +func (sp *supervised) Workers() (workers []worker.BaseProcess) { sp.mu.Lock() defer sp.mu.Unlock() return sp.pool.Workers() } -func (sp *supervisedPool) RemoveWorker(worker WorkerBase) error { +func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { return sp.pool.RemoveWorker(worker) } -func (sp *supervisedPool) Destroy(ctx context.Context) { +func (sp *supervised) Destroy(ctx context.Context) { sp.pool.Destroy(ctx) } -func (sp *supervisedPool) Start() { +func (sp *supervised) Start() { go func() { watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick)) for { @@ -128,11 +132,11 @@ func (sp *supervisedPool) Start() { }() } -func (sp *supervisedPool) Stop() { +func (sp *supervised) Stop() { sp.stopCh <- struct{}{} } -func (sp *supervisedPool) control() { +func (sp *supervised) control() { now := time.Now() const op = errors.Op("supervised pool control tick") @@ -140,11 +144,11 @@ func (sp *supervisedPool) control() { workers := sp.pool.Workers() for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == StateInvalid { + if workers[i].State().Value() == internal.StateInvalid { continue } - s, err := WorkerProcessState(workers[i]) + s, err := roadrunner.WorkerProcessState(workers[i]) if err != nil { // worker not longer valid for supervision continue @@ -153,27 +157,27 @@ func (sp *supervisedPool) control() { if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { err = sp.pool.RemoveWorker(workers[i]) if err != nil { - sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) return } - sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) + sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { err = sp.pool.RemoveWorker(workers[i]) if err != nil { - sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) return } - sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]}) + sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue } // firs we check maxWorker idle if sp.cfg.IdleTTL != 0 { // then check for the worker state - if workers[i].State().Value() != StateReady { + if workers[i].State().Value() != internal.StateReady { continue } @@ -194,10 +198,10 @@ func (sp *supervisedPool) control() { if sp.cfg.IdleTTL-uint64(res) <= 0 { err = sp.pool.RemoveWorker(workers[i]) if err != nil { - sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) return } - sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]}) + sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } } } diff --git a/supervisor_test.go b/pkg/pool/supervisor_test.go index d5d7d04c..2e3e7fd2 100644 --- a/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -1,4 +1,4 @@ -package roadrunner +package pool import ( "context" @@ -6,10 +6,13 @@ import ( "testing" "time" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/pipe" "github.com/stretchr/testify/assert" ) -var cfgSupervised = PoolConfig{ +var cfgSupervised = Config{ NumWorkers: int64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -26,8 +29,8 @@ func TestSupervisedPool_Exec(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/memleak.php", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), cfgSupervised, ) @@ -44,7 +47,7 @@ func TestSupervisedPool_Exec(t *testing.T) { default: workers := p.Workers() if len(workers) > 0 { - s, err := WorkerProcessState(workers[0]) + s, err := roadrunner.WorkerProcessState(workers[0]) assert.NoError(t, err) assert.NotNil(t, s) // since this is soft limit, double max memory limit watch @@ -58,7 +61,7 @@ func TestSupervisedPool_Exec(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 50) - _, err = p.Exec(Payload{ + _, err = p.Exec(internal.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -69,7 +72,7 @@ func TestSupervisedPool_Exec(t *testing.T) { } func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { - var cfgExecTTL = PoolConfig{ + var cfgExecTTL = Config{ NumWorkers: int64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -84,8 +87,8 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/sleep.php", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), cfgExecTTL, ) @@ -95,7 +98,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.ExecWithContext(context.Background(), Payload{ + resp, err := p.ExecWithContext(context.Background(), internal.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -109,7 +112,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { } func TestSupervisedPool_ExecTTL_OK(t *testing.T) { - var cfgExecTTL = PoolConfig{ + var cfgExecTTL = Config{ NumWorkers: int64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -124,8 +127,8 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { ctx := context.Background() p, err := NewPool( ctx, - func() *exec.Cmd { return exec.Command("php", "tests/sleep.php", "pipes") }, - NewPipeFactory(), + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), cfgExecTTL, ) @@ -136,7 +139,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(Payload{ + resp, err := p.Exec(internal.Payload{ Context: []byte(""), Body: []byte("foo"), }) diff --git a/socket_factory.go b/pkg/socket/socket_factory.go index e517c03f..f721ad66 100755 --- a/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -1,4 +1,4 @@ -package roadrunner +package socket import ( "context" @@ -9,14 +9,17 @@ import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" + workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/goridge/v3" "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) -// SocketFactory connects to external stack using socket server. -type SocketFactory struct { +// Factory connects to external stack using socket server. +type Factory struct { // listens for incoming connections from underlying processes ls net.Listener @@ -32,10 +35,10 @@ type SocketFactory struct { // todo: review -// NewSocketServer returns SocketFactory attached to a given socket listener. +// NewSocketServer returns Factory attached to a given socket listener. // tout specifies for how long factory should serve for incoming relay connection -func NewSocketServer(ls net.Listener, tout time.Duration) Factory { - f := &SocketFactory{ +func NewSocketServer(ls net.Listener, tout time.Duration) worker.Factory { + f := &Factory{ ls: ls, tout: tout, relays: sync.Map{}, @@ -53,7 +56,7 @@ func NewSocketServer(ls net.Listener, tout time.Duration) Factory { } // blocking operation, returns an error -func (f *SocketFactory) listen() error { +func (f *Factory) listen() error { errGr := &errgroup.Group{} errGr.Go(func() error { for { @@ -63,7 +66,7 @@ func (f *SocketFactory) listen() error { } rl := goridge.NewSocketRelay(conn) - pid, err := fetchPID(rl) + pid, err := internal.FetchPID(rl) if err != nil { return err } @@ -76,18 +79,18 @@ func (f *SocketFactory) listen() error { } type socketSpawn struct { - w WorkerBase + w worker.BaseProcess err error } -// SpawnWorker creates WorkerProcess and connects it to appropriate relay or returns error -func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) { +// SpawnWorker creates Process and connects it to appropriate relay or returns error +func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { const op = errors.Op("spawn_worker_with_context") c := make(chan socketSpawn) go func() { ctx, cancel := context.WithTimeout(ctx, f.tout) defer cancel() - w, err := InitBaseWorker(cmd) + w, err := workerImpl.InitBaseWorker(cmd) if err != nil { c <- socketSpawn{ w: nil, @@ -121,7 +124,7 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm } w.AttachRelay(rl) - w.State().Set(StateReady) + w.State().Set(internal.StateReady) c <- socketSpawn{ w: w, @@ -141,9 +144,9 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm } } -func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { const op = errors.Op("spawn_worker") - w, err := InitBaseWorker(cmd) + w, err := workerImpl.InitBaseWorker(cmd) if err != nil { return nil, err } @@ -164,18 +167,18 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { } w.AttachRelay(rl) - w.State().Set(StateReady) + w.State().Set(internal.StateReady) return w, nil } // Close socket factory and underlying socket connection. -func (f *SocketFactory) Close(ctx context.Context) error { +func (f *Factory) Close(ctx context.Context) error { return f.ls.Close() } -// waits for WorkerProcess to connect over socket and returns associated relay of timeout -func (f *SocketFactory) findRelayWithContext(ctx context.Context, w WorkerBase) (*goridge.SocketRelay, error) { +// waits for Process to connect over socket and returns associated relay of timeout +func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*goridge.SocketRelay, error) { ticker := time.NewTicker(time.Millisecond * 100) for { select { @@ -196,7 +199,7 @@ func (f *SocketFactory) findRelayWithContext(ctx context.Context, w WorkerBase) } } -func (f *SocketFactory) findRelay(w WorkerBase) (*goridge.SocketRelay, error) { +func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) { const op = errors.Op("find_relay") // poll every 1ms for the relay pollDone := time.NewTimer(f.tout) @@ -215,11 +218,11 @@ func (f *SocketFactory) findRelay(w WorkerBase) (*goridge.SocketRelay, error) { } // chan to store relay associated with specific pid -func (f *SocketFactory) attachRelayToPid(pid int64, relay goridge.Relay) { +func (f *Factory) attachRelayToPid(pid int64, relay goridge.Relay) { f.relays.Store(pid, relay) } // deletes relay chan associated with specific pid -func (f *SocketFactory) removeRelayFromPid(pid int64) { +func (f *Factory) removeRelayFromPid(pid int64) { f.relays.Delete(pid) } diff --git a/socket_factory_test.go b/pkg/socket/socket_factory_test.go index bbe8cc31..f1a7d637 100755 --- a/socket_factory_test.go +++ b/pkg/socket/socket_factory_test.go @@ -1,4 +1,4 @@ -package roadrunner +package socket import ( "context" @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -27,7 +29,7 @@ func Test_Tcp_Start(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) assert.NoError(t, err) @@ -39,7 +41,7 @@ func Test_Tcp_Start(t *testing.T) { err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) + t.Errorf("error stopping the Process: error %v", err) } } @@ -52,7 +54,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") f := NewSocketServer(ls, time.Minute) defer func() { @@ -68,7 +70,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) + t.Errorf("error stopping the Process: error %v", err) } } @@ -87,7 +89,7 @@ func Test_Tcp_StartError(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") err = cmd.Start() if err != nil { t.Errorf("error executing the command: error %v", err) @@ -114,7 +116,7 @@ func Test_Tcp_Failboot(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/failboot.php") + cmd := exec.Command("php", "../../tests/failboot.php") w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) @@ -137,7 +139,7 @@ func Test_Tcp_Timeout(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/slow-client.php", "echo", "tcp", "200", "0") + cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0") w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) @@ -160,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/invalid.php") + cmd := exec.Command("php", "../../tests/invalid.php") w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) @@ -182,7 +184,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "broken", "tcp") + cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) if err != nil { @@ -204,12 +206,12 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, err2) }() - sw, err := NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { t.Fatal(err) } - res, err := sw.Exec(Payload{Body: []byte("hello")}) + res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -231,7 +233,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) go func() { @@ -240,16 +242,16 @@ func Test_Tcp_Echo(t *testing.T) { defer func() { err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) + t.Errorf("error stopping the Process: error %v", err) } }() - sw, err := NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { t.Fatal(err) } - res, err := sw.Exec(Payload{Body: []byte("hello")}) + res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -273,7 +275,7 @@ func Test_Unix_Start(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) assert.NoError(t, err) @@ -285,7 +287,7 @@ func Test_Unix_Start(t *testing.T) { err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) + t.Errorf("error stopping the Process: error %v", err) } } @@ -303,7 +305,7 @@ func Test_Unix_Failboot(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/failboot.php") + cmd := exec.Command("php", "../../tests/failboot.php") w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) @@ -325,7 +327,7 @@ func Test_Unix_Timeout(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/slow-client.php", "echo", "unix", "200", "0") + cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) @@ -347,7 +349,7 @@ func Test_Unix_Invalid(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/invalid.php") + cmd := exec.Command("php", "../../tests/invalid.php") w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) @@ -368,7 +370,7 @@ func Test_Unix_Broken(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "broken", "unix") + cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) if err != nil { @@ -389,12 +391,12 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, err) }() - sw, err := NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { t.Fatal(err) } - res, err := sw.Exec(Payload{Body: []byte("hello")}) + res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) @@ -416,7 +418,7 @@ func Test_Unix_Echo(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) if err != nil { @@ -428,16 +430,16 @@ func Test_Unix_Echo(t *testing.T) { defer func() { err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) + t.Errorf("error stopping the Process: error %v", err) } }() - sw, err := NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { t.Fatal(err) } - res, err := sw.Exec(Payload{Body: []byte("hello")}) + res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -463,7 +465,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") w, err := f.SpawnWorkerWithContext(ctx, cmd) if err != nil { @@ -475,7 +477,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { err = w.Stop(ctx) if err != nil { - b.Errorf("error stopping the WorkerProcess: error %v", err) + b.Errorf("error stopping the Process: error %v", err) } } } @@ -494,7 +496,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { b.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) if err != nil { @@ -503,17 +505,17 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { defer func() { err = w.Stop(ctx) if err != nil { - b.Errorf("error stopping the WorkerProcess: error %v", err) + b.Errorf("error stopping the Process: error %v", err) } }() - sw, err := NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { b.Fatal(err) } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -535,7 +537,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") w, err := f.SpawnWorkerWithContext(ctx, cmd) if err != nil { @@ -543,7 +545,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { } err = w.Stop(ctx) if err != nil { - b.Errorf("error stopping the WorkerProcess: error %v", err) + b.Errorf("error stopping the Process: error %v", err) } } } @@ -562,7 +564,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { b.Skip("socket is busy") } - cmd := exec.Command("php", "tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) if err != nil { @@ -571,17 +573,17 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { defer func() { err = w.Stop(ctx) if err != nil { - b.Errorf("error stopping the WorkerProcess: error %v", err) + b.Errorf("error stopping the Process: error %v", err) } }() - sw, err := NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { b.Fatal(err) } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/sync_worker.go b/pkg/worker/sync_worker.go index 94a804a7..1eb1396e 100755 --- a/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -1,4 +1,4 @@ -package roadrunner +package worker import ( "bytes" @@ -6,110 +6,101 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/util" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" "go.uber.org/multierr" "github.com/spiral/goridge/v3" ) -var EmptyPayload = Payload{} - -type SyncWorker interface { - // WorkerBase provides basic functionality for the SyncWorker - WorkerBase - // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(rqs Payload) (Payload, error) - // ExecWithContext used to handle Exec with TTL - ExecWithContext(ctx context.Context, p Payload) (Payload, error) -} - type syncWorker struct { - w WorkerBase + w worker.BaseProcess } -// NewSyncWorker creates SyncWorker from WorkerBasa -func NewSyncWorker(w WorkerBase) (SyncWorker, error) { +// From creates SyncWorker from WorkerBasa +func From(w worker.BaseProcess) (worker.SyncWorker, error) { return &syncWorker{ w: w, }, nil } // Exec payload without TTL timeout. -func (tw *syncWorker) Exec(p Payload) (Payload, error) { +func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { const op = errors.Op("sync worker Exec") if len(p.Body) == 0 && len(p.Context) == 0 { - return EmptyPayload, errors.E(op, errors.Str("payload can not be empty")) + return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.w.State().Value() != StateReady { - return EmptyPayload, errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())) + if tw.w.State().Value() != internal.StateReady { + return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) } // set last used time tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(StateWorking) + tw.w.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.ErrSoftJob, err) == false { - tw.w.State().Set(StateErrored) + tw.w.State().Set(internal.StateErrored) tw.w.State().RegisterExec() } - return EmptyPayload, err + return internal.Payload{}, err } - tw.w.State().Set(StateReady) + tw.w.State().Set(internal.StateReady) tw.w.State().RegisterExec() return rsp, nil } type wexec struct { - payload Payload + payload internal.Payload err error } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, error) { +func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) { const op = errors.Op("ExecWithContext") c := make(chan wexec, 1) go func() { if len(p.Body) == 0 && len(p.Context) == 0 { c <- wexec{ - payload: EmptyPayload, + payload: internal.Payload{}, err: errors.E(op, errors.Str("payload can not be empty")), } return } - if tw.w.State().Value() != StateReady { + if tw.w.State().Value() != internal.StateReady { c <- wexec{ - payload: EmptyPayload, - err: errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())), + payload: internal.Payload{}, + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())), } return } // set last used time tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(StateWorking) + tw.w.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.ErrSoftJob, err) == false { - tw.w.State().Set(StateErrored) + tw.w.State().Set(internal.StateErrored) tw.w.State().RegisterExec() } c <- wexec{ - payload: EmptyPayload, + payload: internal.Payload{}, err: errors.E(op, err), } return } - tw.w.State().Set(StateReady) + tw.w.State().Set(internal.StateReady) tw.w.State().RegisterExec() c <- wexec{ @@ -122,18 +113,18 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, case <-ctx.Done(): err := multierr.Combine(tw.Kill()) if err != nil { - return EmptyPayload, multierr.Append(err, ctx.Err()) + return internal.Payload{}, multierr.Append(err, ctx.Err()) } - return EmptyPayload, ctx.Err() + return internal.Payload{}, ctx.Err() case res := <-c: if res.err != nil { - return EmptyPayload, res.err + return internal.Payload{}, res.err } return res.payload, nil } } -func (tw *syncWorker) execPayload(p Payload) (Payload, error) { +func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) { const op = errors.Op("exec payload") frame := goridge.NewFrame() @@ -156,35 +147,35 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) { err := tw.Relay().Send(frame) if err != nil { - return EmptyPayload, err + return internal.Payload{}, err } frameR := goridge.NewFrame() err = tw.w.Relay().Receive(frameR) if err != nil { - return EmptyPayload, errors.E(op, err) + return internal.Payload{}, errors.E(op, err) } if frameR == nil { - return EmptyPayload, errors.E(op, errors.Str("nil frame received")) + return internal.Payload{}, errors.E(op, errors.Str("nil frame received")) } if !frameR.VerifyCRC() { - return EmptyPayload, errors.E(op, errors.Str("failed to verify CRC")) + return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC")) } flags := frameR.ReadFlags() if flags&byte(goridge.ERROR) != byte(0) { - return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) + return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) } options := frameR.ReadOptions() if len(options) != 1 { - return EmptyPayload, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) + return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) } - payload := Payload{} + payload := internal.Payload{} payload.Context = frameR.Payload()[:options[0]] payload.Body = frameR.Payload()[options[0]:] @@ -203,11 +194,11 @@ func (tw *syncWorker) Created() time.Time { return tw.w.Created() } -func (tw *syncWorker) AddListener(listener util.EventListener) { +func (tw *syncWorker) AddListener(listener events.EventListener) { tw.w.AddListener(listener) } -func (tw *syncWorker) State() State { +func (tw *syncWorker) State() internal.State { return tw.w.State() } diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go new file mode 100755 index 00000000..e224e105 --- /dev/null +++ b/pkg/worker/sync_worker_test.go @@ -0,0 +1,37 @@ +package worker + +import ( + "os/exec" + "testing" + + "github.com/spiral/roadrunner/v2/internal" + "github.com/stretchr/testify/assert" +) + +func Test_NotStarted_String(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := InitBaseWorker(cmd) + assert.Contains(t, w.String(), "php tests/client.php echo pipes") + assert.Contains(t, w.String(), "inactive") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_NotStarted_Exec(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := InitBaseWorker(cmd) + + syncWorker, err := From(w) + if err != nil { + t.Fatal(err) + } + + res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Contains(t, err.Error(), "Process is not ready (inactive)") +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go new file mode 100755 index 00000000..35d3264e --- /dev/null +++ b/pkg/worker/worker.go @@ -0,0 +1,302 @@ +package worker + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" + events2 "github.com/spiral/roadrunner/v2/pkg/events" + "go.uber.org/multierr" +) + +const ( + // WaitDuration - for how long error buffer should attempt to aggregate error messages + // before merging output together since lastError update (required to keep error update together). + WaitDuration = 25 * time.Millisecond + + // ReadBufSize used to make a slice with specified length to read from stderr + ReadBufSize = 10240 // Kb +) + +var syncPool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, ReadBufSize) + return &buf + }, +} + +// Process - supervised process with api over goridge.Relay. +type Process struct { + // created indicates at what time Process has been created. + created time.Time + + // updates parent supervisor or pool about Process events + events events.Handler + + // state holds information about current Process state, + // number of Process executions, buf status change time. + // publicly this object is receive-only and protected using Mutex + // and atomic counter. + state *internal.WorkerState + + // underlying command with associated process, command must be + // provided to Process from outside in non-started form. CmdSource + // stdErr direction will be handled by Process to aggregate error message. + cmd *exec.Cmd + + // pid of the process, points to pid of underlying process and + // can be nil while process is not started. + pid int + + // stderr aggregates stderr output from underlying process. Value can be + // receive only once command is completed and all pipes are closed. + stderr *bytes.Buffer + + // channel is being closed once command is complete. + // waitDone chan interface{} + + // contains information about resulted process state. + endState *os.ProcessState + + // ensures than only one execution can be run at once. + mu sync.RWMutex + + // communication bus with underlying process. + relay goridge.Relay + // rd in a second part of pipe to read from stderr + rd io.Reader + // stop signal terminates io.Pipe from reading from stderr + stop chan struct{} +} + +// InitBaseWorker creates new Process over given exec.cmd. +func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { + if cmd.Process != nil { + return nil, fmt.Errorf("can't attach to running process") + } + w := &Process{ + created: time.Now(), + events: events2.NewEventsHandler(), + cmd: cmd, + state: internal.NewWorkerState(internal.StateInactive), + stderr: new(bytes.Buffer), + stop: make(chan struct{}, 1), + } + + w.rd, w.cmd.Stderr = io.Pipe() + + // small buffer optimization + // at this point we know, that stderr will contain huge messages + w.stderr.Grow(ReadBufSize) + + go func() { + w.watch() + }() + + return w, nil +} + +// Pid returns worker pid. +func (w *Process) Pid() int64 { + return int64(w.pid) +} + +// Created returns time worker was created at. +func (w *Process) Created() time.Time { + return w.created +} + +// AddListener registers new worker event listener. +func (w *Process) AddListener(listener events.EventListener) { + w.events.AddListener(listener) +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) State() internal.State { + return w.state +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) AttachRelay(rl goridge.Relay) { + w.relay = rl +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) Relay() goridge.Relay { + return w.relay +} + +// String returns Process description. fmt.Stringer interface +func (w *Process) String() string { + st := w.state.String() + // we can safely compare pid to 0 + if w.pid != 0 { + st = st + ", pid:" + strconv.Itoa(w.pid) + } + + return fmt.Sprintf( + "(`%s` [%s], numExecs: %v)", + strings.Join(w.cmd.Args, " "), + st, + w.state.NumExecs(), + ) +} + +func (w *Process) Start() error { + err := w.cmd.Start() + if err != nil { + return err + } + w.pid = w.cmd.Process.Pid + return nil +} + +// Wait must be called once for each Process, call will be released once Process is +// complete and will return process error (if any), if stderr is presented it's value +// will be wrapped as WorkerError. Method will return error code if php process fails +// to find or Start the script. +func (w *Process) Wait() error { + const op = errors.Op("worker process wait") + err := multierr.Combine(w.cmd.Wait()) + + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then w.cmd.Wait return an error + w.endState = w.cmd.ProcessState + if err != nil { + w.state.Set(internal.StateErrored) + + w.mu.RLock() + // if process return code > 0, here will be an error from stderr (if presents) + if w.stderr.Len() > 0 { + err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) + // stop the stderr buffer + w.stop <- struct{}{} + } + w.mu.RUnlock() + + return multierr.Append(err, w.closeRelay()) + } + + err = multierr.Append(err, w.closeRelay()) + if err != nil { + w.state.Set(internal.StateErrored) + return err + } + + if w.endState.Success() { + w.state.Set(internal.StateStopped) + } + + return nil +} + +func (w *Process) closeRelay() error { + if w.relay != nil { + err := w.relay.Close() + if err != nil { + return err + } + } + return nil +} + +// Stop sends soft termination command to the Process and waits for process completion. +func (w *Process) Stop(ctx context.Context) error { + c := make(chan error) + + go func() { + var err error + w.state.Set(internal.StateStopping) + err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) + if err != nil { + w.state.Set(internal.StateKilling) + c <- multierr.Append(err, w.cmd.Process.Kill()) + } + w.state.Set(internal.StateStopped) + c <- nil + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-c: + if err != nil { + return err + } + return nil + } +} + +// Kill kills underlying process, make sure to call Wait() func to gather +// error log from the stderr. Does not waits for process completion! +func (w *Process) Kill() error { + w.state.Set(internal.StateKilling) + err := w.cmd.Process.Signal(os.Kill) + if err != nil { + return err + } + w.state.Set(internal.StateStopped) + return nil +} + +// put the pointer, to not allocate new slice +// but erase it len and then return back +func (w *Process) put(data *[]byte) { + *data = (*data)[:0] + *data = (*data)[:cap(*data)] + + syncPool.Put(data) +} + +// get pointer to the byte slice +func (w *Process) get() *[]byte { + return syncPool.Get().(*[]byte) +} + +// Write appends the contents of pool to the errBuffer, growing the errBuffer as +// needed. The return value n is the length of pool; errBuffer is always nil. +func (w *Process) watch() { + go func() { + for { + select { + case <-w.stop: + buf := w.get() + // read the last data + n, _ := w.rd.Read(*buf) + w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write((*buf)[:n]) + w.mu.Unlock() + w.put(buf) + return + default: + // read the max 10kb of stderr per one read + buf := w.get() + n, _ := w.rd.Read(*buf) + w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write((*buf)[:n]) + w.mu.Unlock() + w.put(buf) + } + } + }() +} diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go new file mode 100755 index 00000000..805f66b5 --- /dev/null +++ b/pkg/worker/worker_test.go @@ -0,0 +1,19 @@ +package worker + +import ( + "os/exec" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_OnStarted(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + assert.Nil(t, cmd.Start()) + + w, err := InitBaseWorker(cmd) + assert.Nil(t, w) + assert.NotNil(t, err) + + assert.Equal(t, "can't attach to running process", err.Error()) +} diff --git a/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index f8fb67a9..8788e509 100755 --- a/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -1,4 +1,4 @@ -package roadrunner +package worker_watcher //nolint:golint,stylecheck import ( "context" @@ -7,11 +7,14 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/util" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" + syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" ) type Stack struct { - workers []WorkerBase + workers []worker.BaseProcess mutex sync.RWMutex destroy bool actualNumOfWorkers int64 @@ -20,7 +23,7 @@ type Stack struct { func NewWorkersStack() *Stack { w := runtime.NumCPU() return &Stack{ - workers: make([]WorkerBase, 0, w), + workers: make([]worker.BaseProcess, 0, w), actualNumOfWorkers: 0, } } @@ -34,7 +37,7 @@ func (stack *Stack) Reset() { // Push worker back to the stack // If stack in destroy state, Push will provide 100ms window to unlock the mutex -func (stack *Stack) Push(w WorkerBase) { +func (stack *Stack) Push(w worker.BaseProcess) { stack.mutex.Lock() defer stack.mutex.Unlock() stack.actualNumOfWorkers++ @@ -47,7 +50,7 @@ func (stack *Stack) IsEmpty() bool { return len(stack.workers) == 0 } -func (stack *Stack) Pop() (WorkerBase, bool) { +func (stack *Stack) Pop() (worker.BaseProcess, bool) { stack.mutex.Lock() defer stack.mutex.Unlock() @@ -84,14 +87,13 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { } // Workers return copy of the workers in the stack -func (stack *Stack) Workers() []WorkerBase { +func (stack *Stack) Workers() []worker.BaseProcess { stack.mutex.Lock() defer stack.mutex.Unlock() - workersCopy := make([]WorkerBase, 0, 1) + workersCopy := make([]worker.BaseProcess, 0, 1) // copy for _, v := range stack.workers { - sw := v.(SyncWorker) - workersCopy = append(workersCopy, sw) + workersCopy = append(workersCopy, v) } return workersCopy @@ -126,7 +128,7 @@ func (stack *Stack) Destroy(ctx context.Context) { stack.mutex.Lock() for i := 0; i < len(stack.workers); i++ { // set state for the stack in the stack (unused at the moment) - stack.workers[i].State().Set(StateDestroyed) + stack.workers[i].State().Set(internal.StateDestroyed) } stack.mutex.Unlock() tt.Stop() @@ -137,31 +139,8 @@ func (stack *Stack) Destroy(ctx context.Context) { } } -type WorkerWatcher interface { - // AddToWatch used to add stack to wait its state - AddToWatch(workers []WorkerBase) error - - // GetFreeWorker provide first free worker - GetFreeWorker(ctx context.Context) (WorkerBase, error) - - // PutWorker enqueues worker back - PushWorker(w WorkerBase) - - // AllocateNew used to allocate new worker and put in into the WorkerWatcher - AllocateNew() error - - // Destroy destroys the underlying stack - Destroy(ctx context.Context) - - // WorkersList return all stack w/o removing it from internal storage - WorkersList() []WorkerBase - - // RemoveWorker remove worker from the stack - RemoveWorker(wb WorkerBase) error -} - // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func newWorkerWatcher(allocator Allocator, numWorkers int64, events util.EventsHandler) WorkerWatcher { +func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { ww := &workerWatcher{ stack: NewWorkersStack(), allocator: allocator, @@ -176,29 +155,29 @@ func newWorkerWatcher(allocator Allocator, numWorkers int64, events util.EventsH type workerWatcher struct { mutex sync.RWMutex stack *Stack - allocator Allocator + allocator worker.Allocator initialNumWorkers int64 actualNumWorkers int64 - events util.EventsHandler + events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []WorkerBase) error { +func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - sw, err := NewSyncWorker(workers[i]) + sw, err := syncWorker.From(workers[i]) if err != nil { return err } ww.stack.Push(sw) sw.AddListener(ww.events.Push) - go func(swc WorkerBase) { + go func(swc worker.BaseProcess) { ww.wait(swc) }(sw) } return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("GetFreeWorker") // thread safe operation w, stop := ww.stack.Pop() @@ -208,7 +187,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) // handle worker remove state // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == StateRemove { + if w != nil && w.State().Value() == internal.StateRemove { err := ww.RemoveWorker(w) if err != nil { return nil, err @@ -250,15 +229,15 @@ func (ww *workerWatcher) AllocateNew() error { ww.stack.mutex.Unlock() ww.PushWorker(sw) - ww.events.Push(PoolEvent{ - Event: EventWorkerConstruct, + ww.events.Push(events.PoolEvent{ + Event: events.EventWorkerConstruct, Payload: sw, }) return nil } -func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error { +func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { ww.mutex.Lock() defer ww.mutex.Unlock() @@ -266,7 +245,7 @@ func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error { pid := wb.Pid() if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(StateInvalid) + wb.State().Set(internal.StateInvalid) err := wb.Kill() if err != nil { return errors.E(op, err) @@ -274,12 +253,12 @@ func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error { return nil } - wb.State().Set(StateRemove) + wb.State().Set(internal.StateRemove) return nil } // O(1) operation -func (ww *workerWatcher) PushWorker(w WorkerBase) { +func (ww *workerWatcher) PushWorker(w worker.BaseProcess) { ww.mutex.Lock() defer ww.mutex.Unlock() ww.stack.Push(w) @@ -292,22 +271,22 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []WorkerBase { +func (ww *workerWatcher) WorkersList() []worker.BaseProcess { return ww.stack.Workers() } -func (ww *workerWatcher) wait(w WorkerBase) { +func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("process wait") err := w.Wait() if err != nil { - ww.events.Push(WorkerEvent{ - Event: EventWorkerError, + ww.events.Push(events.WorkerEvent{ + Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err), }) } - if w.State().Value() == StateDestroyed { + if w.State().Value() == internal.StateDestroyed { // worker was manually destroyed, no need to replace return } @@ -315,14 +294,14 @@ func (ww *workerWatcher) wait(w WorkerBase) { _ = ww.stack.FindAndRemoveByPid(w.Pid()) err = ww.AllocateNew() if err != nil { - ww.events.Push(PoolEvent{ - Event: EventPoolError, + ww.events.Push(events.PoolEvent{ + Event: events.EventPoolError, Payload: errors.E(op, err), }) } } -func (ww *workerWatcher) addToWatch(wb WorkerBase) { +func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { ww.mutex.Lock() defer ww.mutex.Unlock() go func() { diff --git a/plugins/http/config.go b/plugins/http/config.go index d6efe310..00d2940b 100644 --- a/plugins/http/config.go +++ b/plugins/http/config.go @@ -8,7 +8,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" ) type Cidrs []*net.IPNet @@ -56,7 +56,7 @@ type Config struct { Uploads *UploadsConfig // Pool configures worker pool. - Pool *roadrunner.PoolConfig + Pool *poolImpl.Config // Env is environment variables passed to the http pool Env map[string]string @@ -141,7 +141,7 @@ func (c *Config) EnableFCGI() bool { func (c *Config) InitDefaults() error { if c.Pool == nil { // default pool - c.Pool = &roadrunner.PoolConfig{ + c.Pool = &poolImpl.Config{ Debug: false, NumWorkers: int64(runtime.NumCPU()), MaxJobs: 1000, diff --git a/plugins/http/handler.go b/plugins/http/handler.go index 74b038ff..57590bfd 100644 --- a/plugins/http/handler.go +++ b/plugins/http/handler.go @@ -10,9 +10,9 @@ import ( "github.com/hashicorp/go-multierror" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/log" - "github.com/spiral/roadrunner/v2/util" + "github.com/spiral/roadrunner/v2/interfaces/pool" ) const ( @@ -23,8 +23,10 @@ const ( EventError ) +const MB = 1024 * 1024 + type Handle interface { - AddListener(l util.EventListener) + AddListener(l events.EventListener) ServeHTTP(w http.ResponseWriter, r *http.Request) } @@ -71,17 +73,17 @@ type handler struct { uploads UploadsConfig trusted Cidrs log log.Logger - pool roadrunner.Pool + pool pool.Pool mul sync.Mutex - lsn util.EventListener + lsn events.EventListener } -func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool roadrunner.Pool) (Handle, error) { +func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) { if pool == nil { return nil, errors.E(errors.Str("pool should be initialized")) } return &handler{ - maxRequestSize: maxReqSize * roadrunner.MB, + maxRequestSize: maxReqSize * MB, uploads: uploads, pool: pool, trusted: trusted, @@ -89,7 +91,7 @@ func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool ro } // Listen attaches handler event controller. -func (h *handler) AddListener(l util.EventListener) { +func (h *handler) AddListener(l events.EventListener) { h.mul.Lock() defer h.mul.Unlock() diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 13299da1..460263f6 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -15,10 +15,13 @@ import ( "github.com/hashicorp/go-multierror" "github.com/spiral/endure" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/log" - factory "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/interfaces/status" + "github.com/spiral/roadrunner/v2/interfaces/worker" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" "github.com/spiral/roadrunner/v2/util" @@ -47,17 +50,17 @@ type Plugin struct { sync.Mutex configurer config.Configurer - server factory.Server + server server.Server log log.Logger cfg *Config // middlewares to chain mdwr middleware - // Event listener to stdout - listener util.EventListener + // WorkerEvent listener to stdout + listener events.EventListener // Pool which attached to all servers - pool roadrunner.Pool + pool pool.Pool // servers RR handler handler Handle @@ -69,7 +72,7 @@ type Plugin struct { } // AddListener attaches server event controller. -func (s *Plugin) AddListener(listener util.EventListener) { +func (s *Plugin) AddListener(listener events.EventListener) { // save listeners for Reset s.listener = listener s.pool.AddListener(listener) @@ -77,7 +80,7 @@ func (s *Plugin) AddListener(listener util.EventListener) { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Server) error { +func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server server.Server) error { const op = errors.Op("http Init") err := cfg.UnmarshalKey(PluginName, &s.cfg) if err != nil { @@ -97,7 +100,7 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Serv return errors.E(op, errors.Disabled) } - s.pool, err = server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + s.pool, err = server.NewWorkerPool(context.Background(), poolImpl.Config{ Debug: s.cfg.Pool.Debug, NumWorkers: s.cfg.Pool.NumWorkers, MaxJobs: s.cfg.Pool.MaxJobs, @@ -122,8 +125,8 @@ func (s *Plugin) logCallback(event interface{}) { s.log.Debug("http handler response received", "elapsed", ev.Elapsed().String(), "remote address", ev.Request.RemoteAddr) case ErrorEvent: s.log.Error("error event received", "elapsed", ev.Elapsed().String(), "error", ev.Error) - case roadrunner.WorkerEvent: - s.log.Debug("worker event received", "event", ev.Event, "worker state", ev.Worker.State()) + case events.WorkerEvent: + s.log.Debug("worker event received", "event", ev.Event, "worker state", ev.Worker.(worker.BaseProcess).State()) default: fmt.Println(event) } @@ -284,7 +287,7 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // Server returns associated pool workers -func (s *Plugin) Workers() []roadrunner.WorkerBase { +func (s *Plugin) Workers() []worker.BaseProcess { return s.pool.Workers() } @@ -305,7 +308,7 @@ func (s *Plugin) Reset() error { return errors.E(op, err) } - s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + s.pool, err = s.server.NewWorkerPool(context.Background(), poolImpl.Config{ Debug: s.cfg.Pool.Debug, NumWorkers: s.cfg.Pool.NumWorkers, MaxJobs: s.cfg.Pool.MaxJobs, diff --git a/plugins/http/request.go b/plugins/http/request.go index 640bdec2..5df79b7d 100644 --- a/plugins/http/request.go +++ b/plugins/http/request.go @@ -9,8 +9,8 @@ import ( "strings" j "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/plugins/http/attributes" ) @@ -136,17 +136,17 @@ func (r *Request) Close(log log.Logger) { // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open // files prior to calling this method. -func (r *Request) Payload() (roadrunner.Payload, error) { - p := roadrunner.Payload{} +func (r *Request) Payload() (internal.Payload, error) { + p := internal.Payload{} var err error if p.Context, err = json.Marshal(r); err != nil { - return roadrunner.EmptyPayload, err + return internal.Payload{}, err } if r.Parsed { if p.Body, err = json.Marshal(r.body); err != nil { - return roadrunner.EmptyPayload, err + return internal.Payload{}, err } } else if r.body != nil { p.Body = r.body.([]byte) diff --git a/plugins/http/response.go b/plugins/http/response.go index e3ac2756..9700a16c 100644 --- a/plugins/http/response.go +++ b/plugins/http/response.go @@ -6,7 +6,7 @@ import ( "strings" "sync" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/internal" ) // Response handles PSR7 response logic. @@ -23,7 +23,7 @@ type Response struct { } // NewResponse creates new response based on given pool payload. -func NewResponse(p roadrunner.Payload) (*Response, error) { +func NewResponse(p internal.Payload) (*Response, error) { r := &Response{Body: p.Body} if err := json.Unmarshal(p.Context, r); err != nil { return nil, err diff --git a/plugins/http/tests/configs/.rr-http.yaml b/plugins/http/tests/configs/.rr-http.yaml index c6868f8c..e2e361cf 100644 --- a/plugins/http/tests/configs/.rr-http.yaml +++ b/plugins/http/tests/configs/.rr-http.yaml @@ -24,19 +24,6 @@ http: maxJobs: 0 allocateTimeout: 60s destroyTimeout: 60s - - ssl: - port: 8892 - redirect: false - cert: fixtures/server.crt - key: fixtures/server.key - # rootCa: root.crt - fcgi: - address: tcp://0.0.0.0:7921 - http2: - enabled: false - h2c: false - maxConcurrentStreams: 128 logs: mode: development level: error diff --git a/plugins/http/tests/handler_test.go b/plugins/http/tests/handler_test.go index 0c6a39ef..54a4ae80 100644 --- a/plugins/http/tests/handler_test.go +++ b/plugins/http/tests/handler_test.go @@ -10,7 +10,8 @@ import ( "runtime" "strings" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/pkg/pipe" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/stretchr/testify/assert" @@ -21,10 +22,10 @@ import ( ) func TestHandler_Echo(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -72,10 +73,10 @@ func Test_HandlerErrors(t *testing.T) { } func TestHandler_Headers(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "header", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -133,10 +134,10 @@ func TestHandler_Headers(t *testing.T) { } func TestHandler_Empty_User_Agent(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "user-agent", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -193,10 +194,10 @@ func TestHandler_Empty_User_Agent(t *testing.T) { } func TestHandler_User_Agent(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "user-agent", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -253,10 +254,10 @@ func TestHandler_User_Agent(t *testing.T) { } func TestHandler_Cookies(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "cookie", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -318,10 +319,10 @@ func TestHandler_Cookies(t *testing.T) { } func TestHandler_JsonPayload_POST(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -382,10 +383,10 @@ func TestHandler_JsonPayload_POST(t *testing.T) { } func TestHandler_JsonPayload_PUT(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -442,10 +443,10 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { } func TestHandler_JsonPayload_PATCH(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -502,10 +503,10 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { } func TestHandler_FormData_POST(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -575,10 +576,10 @@ func TestHandler_FormData_POST(t *testing.T) { } func TestHandler_FormData_POST_Overwrite(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -648,10 +649,10 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { } func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -720,10 +721,10 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { } func TestHandler_FormData_PUT(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -792,10 +793,10 @@ func TestHandler_FormData_PUT(t *testing.T) { } func TestHandler_FormData_PATCH(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -864,10 +865,10 @@ func TestHandler_FormData_PATCH(t *testing.T) { } func TestHandler_Multipart_POST(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -978,10 +979,10 @@ func TestHandler_Multipart_POST(t *testing.T) { } func TestHandler_Multipart_PUT(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1092,10 +1093,10 @@ func TestHandler_Multipart_PUT(t *testing.T) { } func TestHandler_Multipart_PATCH(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1208,10 +1209,10 @@ func TestHandler_Multipart_PATCH(t *testing.T) { } func TestHandler_Error(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1254,10 +1255,10 @@ func TestHandler_Error(t *testing.T) { } func TestHandler_Error2(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error2", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1300,10 +1301,10 @@ func TestHandler_Error2(t *testing.T) { } func TestHandler_Error3(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "pid", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1359,10 +1360,10 @@ func TestHandler_Error3(t *testing.T) { } func TestHandler_ResponseDuration(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1420,10 +1421,10 @@ func TestHandler_ResponseDuration(t *testing.T) { } func TestHandler_ResponseDurationDelayed(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echoDelay", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1480,10 +1481,10 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { } func TestHandler_ErrorDuration(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1554,10 +1555,10 @@ func TestHandler_IP(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1615,10 +1616,10 @@ func TestHandler_XRealIP(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1681,10 +1682,10 @@ func TestHandler_XForwardedFor(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1746,10 +1747,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1794,10 +1795,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { } func BenchmarkHandler_Listen_Echo(b *testing.B) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go index c8dd4b38..1a61597c 100644 --- a/plugins/http/tests/http_test.go +++ b/plugins/http/tests/http_test.go @@ -19,6 +19,7 @@ import ( "github.com/spiral/endure" "github.com/spiral/goridge/v3" "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/mocks" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -821,7 +822,7 @@ func TestHttpMiddleware(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - tt := time.NewTimer(time.Second * 15) + tt := time.NewTimer(time.Second * 20) go func() { defer wg.Done() @@ -900,7 +901,7 @@ func TestHttpEchoErr(t *testing.T) { mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1") mockLogger.EXPECT().Debug("WORLD", "pid", gomock.Any()) - mockLogger.EXPECT().Debug("worker event received", "event", roadrunner.EventWorkerLog, "worker state", gomock.Any()) + mockLogger.EXPECT().Debug("worker event received", "event", events.EventWorkerLog, "worker state", gomock.Any()) err = cont.RegisterAll( cfg, diff --git a/plugins/http/tests/response_test.go b/plugins/http/tests/response_test.go index 2bfe7d56..a526fe03 100644 --- a/plugins/http/tests/response_test.go +++ b/plugins/http/tests/response_test.go @@ -6,7 +6,7 @@ import ( "net/http" "testing" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/internal" http2 "github.com/spiral/roadrunner/v2/plugins/http" "github.com/stretchr/testify/assert" ) @@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error { } func TestNewResponse_Error(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{Context: []byte(`invalid payload`)}) + r, err := http2.NewResponse(internal.Payload{Context: []byte(`invalid payload`)}) assert.Error(t, err) assert.Nil(t, r) } func TestNewResponse_Write(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), Body: []byte(`sample body`), }) @@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) { } func TestNewResponse_Stream(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -92,7 +92,7 @@ func TestNewResponse_Stream(t *testing.T) { } func TestNewResponse_StreamError(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -112,7 +112,7 @@ func TestNewResponse_StreamError(t *testing.T) { } func TestWrite_HandlesPush(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), }) @@ -127,7 +127,7 @@ func TestWrite_HandlesPush(t *testing.T) { } func TestWrite_HandlesTrailers(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), }) @@ -146,7 +146,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { } func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte( `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), }) diff --git a/plugins/http/tests/uploads_test.go b/plugins/http/tests/uploads_test.go index d36d4793..f255ec91 100644 --- a/plugins/http/tests/uploads_test.go +++ b/plugins/http/tests/uploads_test.go @@ -16,7 +16,8 @@ import ( "time" j "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/pkg/pipe" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/stretchr/testify/assert" ) @@ -26,10 +27,10 @@ var json = j.ConfigCompatibleWithStandardLibrary const testFile = "uploads_test.go" func TestHandler_Upload_File(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -109,10 +110,10 @@ func TestHandler_Upload_File(t *testing.T) { } func TestHandler_Upload_NestedFile(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -192,10 +193,10 @@ func TestHandler_Upload_NestedFile(t *testing.T) { } func TestHandler_Upload_File_NoTmpDir(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -275,10 +276,10 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { } func TestHandler_Upload_File_Forbids(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go index f3013394..449be085 100644 --- a/plugins/informer/plugin.go +++ b/plugins/informer/plugin.go @@ -3,9 +3,9 @@ package informer import ( "github.com/spiral/endure" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/informer" "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/interfaces/worker" ) const PluginName = "informer" @@ -21,8 +21,8 @@ func (p *Plugin) Init(log log.Logger) error { return nil } -// Workers provides WorkerBase slice with workers for the requested plugin -func (p *Plugin) Workers(name string) ([]roadrunner.WorkerBase, error) { +// Workers provides BaseProcess slice with workers for the requested plugin +func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) { const op = errors.Op("get workers") svc, ok := p.registry[name] if !ok { diff --git a/plugins/informer/tests/test_plugin.go b/plugins/informer/tests/test_plugin.go index 473b6de7..3fdefde3 100644 --- a/plugins/informer/tests/test_plugin.go +++ b/plugins/informer/tests/test_plugin.go @@ -4,17 +4,18 @@ import ( "context" "time" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/interfaces/worker" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" ) -var testPoolConfig = roadrunner.PoolConfig{ +var testPoolConfig = poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: &roadrunner.SupervisorConfig{ + Supervisor: &poolImpl.SupervisorConfig{ WatchTick: 60, TTL: 1000, IdleTTL: 10, @@ -48,7 +49,7 @@ func (p1 *Plugin1) Name() string { return "informer.plugin1" } -func (p1 *Plugin1) Workers() []roadrunner.WorkerBase { +func (p1 *Plugin1) Workers() []worker.BaseProcess { pool, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) if err != nil { panic(err) diff --git a/plugins/reload/tests/reload_plugin_test.go b/plugins/reload/tests/reload_plugin_test.go index 8fb9474f..d2fad28d 100644 --- a/plugins/reload/tests/reload_plugin_test.go +++ b/plugins/reload/tests/reload_plugin_test.go @@ -386,7 +386,7 @@ func TestReloadCopy3k(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - tt := time.NewTimer(time.Second * 180) + tt := time.NewTimer(time.Second * 220) go func() { defer wg.Done() @@ -429,7 +429,7 @@ func TestReloadCopy3k(t *testing.T) { t.Run("ReloadMake3kFiles", reloadMake3kFiles) ttt := time.Now() t.Run("ReloadCopyFiles", reloadCopyFiles) - if time.Since(ttt).Seconds() > 100 { + if time.Since(ttt).Seconds() > 120 { t.Fatal("spend too much time on copy") } diff --git a/plugins/resetter/tests/test_plugin.go b/plugins/resetter/tests/test_plugin.go index 9f48a43f..1d770e70 100644 --- a/plugins/resetter/tests/test_plugin.go +++ b/plugins/resetter/tests/test_plugin.go @@ -4,17 +4,17 @@ import ( "context" "time" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/server" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" ) -var testPoolConfig = roadrunner.PoolConfig{ +var testPoolConfig = poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: &roadrunner.SupervisorConfig{ + Supervisor: &poolImpl.SupervisorConfig{ WatchTick: 60, TTL: 1000, IdleTTL: 10, diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index ea6d42eb..e6003fbc 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -8,9 +8,14 @@ import ( "strings" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/pipe" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/socket" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" ) @@ -21,7 +26,7 @@ const PluginName = "server" type Plugin struct { cfg Config log log.Logger - factory roadrunner.Factory + factory worker.Factory } // Init application provider. @@ -93,7 +98,7 @@ func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { } // NewWorker issues new standalone worker. -func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { +func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (worker.BaseProcess, error) { const op = errors.Op("new worker") spawnCmd, err := server.CmdFactory(env) if err != nil { @@ -111,13 +116,13 @@ func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner } // NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { +func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env server.Env) (pool.Pool, error) { spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, err } - p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt) + p, err := poolImpl.NewPool(ctx, spawnCmd, server.factory, opt) if err != nil { return nil, err } @@ -128,10 +133,10 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConf } // creates relay and worker factory. -func (server *Plugin) initFactory() (roadrunner.Factory, error) { +func (server *Plugin) initFactory() (worker.Factory, error) { const op = errors.Op("network factory init") if server.cfg.Relay == "" || server.cfg.Relay == "pipes" { - return roadrunner.NewPipeFactory(), nil + return pipe.NewPipeFactory(), nil } dsn := strings.Split(server.cfg.Relay, "://") @@ -147,9 +152,9 @@ func (server *Plugin) initFactory() (roadrunner.Factory, error) { switch dsn[0] { // sockets group case "unix": - return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil + return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil case "tcp": - return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil + return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil default: return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } @@ -165,12 +170,12 @@ func (server *Plugin) setEnv(e server.Env) []string { } func (server *Plugin) collectLogs(event interface{}) { - if we, ok := event.(roadrunner.WorkerEvent); ok { + if we, ok := event.(events.WorkerEvent); ok { switch we.Event { - case roadrunner.EventWorkerError: - server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) - case roadrunner.EventWorkerLog: - server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) + case events.EventWorkerError: + server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid()) + case events.EventWorkerLog: + server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid()) } } } diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go index fbd37e12..61c9a8f9 100644 --- a/plugins/server/tests/plugin_pipes.go +++ b/plugins/server/tests/plugin_pipes.go @@ -5,8 +5,11 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/internal" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -14,12 +17,12 @@ import ( const ConfigSection = "server" const Response = "test" -var testPoolConfig = roadrunner.PoolConfig{ +var testPoolConfig = poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: &roadrunner.SupervisorConfig{ + Supervisor: &poolImpl.SupervisorConfig{ WatchTick: 60, TTL: 1000, IdleTTL: 10, @@ -31,7 +34,7 @@ var testPoolConfig = roadrunner.PoolConfig{ type Foo struct { configProvider config.Configurer wf server.Server - pool roadrunner.Pool + pool pool.Pool } func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error { @@ -44,7 +47,7 @@ func (f *Foo) Serve() chan error { const op = errors.Op("serve") // test payload for echo - r := roadrunner.Payload{ + r := internal.Payload{ Context: nil, Body: []byte(Response), } @@ -78,7 +81,7 @@ func (f *Foo) Serve() chan error { } // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { errCh <- err return errCh diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go index 4942d4c5..3b97efff 100644 --- a/plugins/server/tests/plugin_sockets.go +++ b/plugins/server/tests/plugin_sockets.go @@ -4,8 +4,10 @@ import ( "context" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -13,7 +15,7 @@ import ( type Foo2 struct { configProvider config.Configurer wf server.Server - pool roadrunner.Pool + pool pool.Pool } func (f *Foo2) Init(p config.Configurer, workerFactory server.Server) error { @@ -29,7 +31,7 @@ func (f *Foo2) Serve() chan error { conf := &plugin.Config{} // test payload for echo - r := roadrunner.Payload{ + r := internal.Payload{ Context: nil, Body: []byte(Response), } @@ -59,7 +61,7 @@ func (f *Foo2) Serve() chan error { } // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { errCh <- err return errCh diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go index 89757a02..2857dadc 100644 --- a/plugins/server/tests/plugin_tcp.go +++ b/plugins/server/tests/plugin_tcp.go @@ -4,8 +4,10 @@ import ( "context" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -13,7 +15,7 @@ import ( type Foo3 struct { configProvider config.Configurer wf server.Server - pool roadrunner.Pool + pool pool.Pool } func (f *Foo3) Init(p config.Configurer, workerFactory server.Server) error { @@ -29,7 +31,7 @@ func (f *Foo3) Serve() chan error { conf := &plugin.Config{} // test payload for echo - r := roadrunner.Payload{ + r := internal.Payload{ Context: nil, Body: []byte(Response), } @@ -59,7 +61,7 @@ func (f *Foo3) Serve() chan error { } // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { errCh <- err return errCh diff --git a/process_state.go b/process.go index 1291a904..3efcdbc5 100755..100644 --- a/process_state.go +++ b/process.go @@ -3,6 +3,7 @@ package roadrunner import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/worker" ) // ProcessState provides information about specific worker. @@ -25,7 +26,7 @@ type ProcessState struct { } // WorkerProcessState creates new worker state definition. -func WorkerProcessState(w WorkerBase) (ProcessState, error) { +func WorkerProcessState(w worker.BaseProcess) (ProcessState, error) { const op = errors.Op("worker_process state") p, _ := process.NewProcess(int32(w.Pid())) i, err := p.MemoryInfo() @@ -41,18 +42,3 @@ func WorkerProcessState(w WorkerBase) (ProcessState, error) { MemoryUsage: i.RSS, }, nil } - -// ServerState returns list of all worker states of a given rr server. -func PoolState(pool Pool) ([]ProcessState, error) { - result := make([]ProcessState, 0) - for _, w := range pool.Workers() { - state, err := WorkerProcessState(w) - if err != nil { - return nil, err - } - - result = append(result, state) - } - - return result, nil -} diff --git a/state_test.go b/state_test.go deleted file mode 100755 index 10547a4b..00000000 --- a/state_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package roadrunner - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_NewState(t *testing.T) { - st := newState(StateErrored) - - assert.Equal(t, "errored", st.String()) - - assert.Equal(t, "inactive", newState(StateInactive).String()) - assert.Equal(t, "ready", newState(StateReady).String()) - assert.Equal(t, "working", newState(StateWorking).String()) - assert.Equal(t, "stopped", newState(StateStopped).String()) - assert.Equal(t, "undefined", newState(1000).String()) -} - -func Test_IsActive(t *testing.T) { - assert.False(t, newState(StateInactive).IsActive()) - assert.True(t, newState(StateReady).IsActive()) - assert.True(t, newState(StateWorking).IsActive()) - assert.False(t, newState(StateStopped).IsActive()) - assert.False(t, newState(StateErrored).IsActive()) -} diff --git a/sync_worker_test.go b/sync_worker_test.go deleted file mode 100755 index 0ef1e0cd..00000000 --- a/sync_worker_test.go +++ /dev/null @@ -1,263 +0,0 @@ -package roadrunner - -import ( - "context" - "os/exec" - "strings" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/stretchr/testify/assert" -) - -func Test_Echo(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - if err != nil { - t.Fatal(err) - } - - syncWorker, err := NewSyncWorker(w) - if err != nil { - t.Fatal(err) - } - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop(ctx) - if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) - } - }() - - res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_BadPayload(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - - syncWorker, err := NewSyncWorker(w) - if err != nil { - t.Fatal(err) - } - - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop(ctx) - if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) - } - }() - - res, err := syncWorker.Exec(EmptyPayload) - - assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - - assert.Contains(t, err.Error(), "payload can not be empty") -} - -func Test_NotStarted_String(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := InitBaseWorker(cmd) - assert.Contains(t, w.String(), "php tests/client.php echo pipes") - assert.Contains(t, w.String(), "inactive") - assert.Contains(t, w.String(), "numExecs: 0") -} - -func Test_NotStarted_Exec(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := InitBaseWorker(cmd) - - syncWorker, err := NewSyncWorker(w) - if err != nil { - t.Fatal(err) - } - - res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - - assert.Contains(t, err.Error(), "WorkerProcess is not ready (inactive)") -} - -func Test_String(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop(ctx) - if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) - } - }() - - assert.Contains(t, w.String(), "php tests/client.php echo pipes") - assert.Contains(t, w.String(), "ready") - assert.Contains(t, w.String(), "numExecs: 0") -} - -func Test_Echo_Slow(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "tests/slow-client.php", "echo", "pipes", "10", "10") - - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop(ctx) - if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) - } - }() - - syncWorker, err := NewSyncWorker(w) - if err != nil { - t.Fatal(err) - } - - res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Broken(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "tests/client.php", "broken", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - if err != nil { - t.Fatal(err) - } - - data := "" - mu := &sync.Mutex{} - w.AddListener(func(event interface{}) { - if wev, ok := event.(WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - }) - - syncWorker, err := NewSyncWorker(w) - if err != nil { - t.Fatal(err) - } - - res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - - time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { - t.Fail() - } - mu.Unlock() - assert.Error(t, w.Stop(ctx)) -} - -func Test_Error(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "tests/client.php", "error", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - - defer func() { - err := w.Stop(ctx) - if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) - } - }() - - syncWorker, err := NewSyncWorker(w) - if err != nil { - t.Fatal(err) - } - - res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) - - if errors.Is(errors.ErrSoftJob, err) == false { - t.Fatal("error should be of type errors.ErrSoftJob") - } - assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello") -} - -func Test_NumExecs(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop(ctx) - if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) - } - }() - - syncWorker, err := NewSyncWorker(w) - if err != nil { - t.Fatal(err) - } - - _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, int64(1), w.State().NumExecs()) - - _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, int64(2), w.State().NumExecs()) - - _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, int64(3), w.State().NumExecs()) -} diff --git a/util/events.go b/util/events.go deleted file mode 100755 index 43a3e646..00000000 --- a/util/events.go +++ /dev/null @@ -1,36 +0,0 @@ -package util - -type EventsHandler interface { - NumListeners() int - AddListener(listener EventListener) - Push(e interface{}) -} - -// Event listener listens for the events produced by worker, worker pool or other service. -type EventListener func(event interface{}) - -// EventHandler helps to broadcast events to multiple listeners. -type EventHandler struct { - listeners []EventListener -} - -func NewEventsHandler() EventsHandler { - return &EventHandler{listeners: make([]EventListener, 0, 2)} -} - -// NumListeners returns number of event listeners. -func (eb *EventHandler) NumListeners() int { - return len(eb.listeners) -} - -// AddListener registers new event listener. -func (eb *EventHandler) AddListener(listener EventListener) { - eb.listeners = append(eb.listeners, listener) -} - -// Push broadcast events across all event listeners. -func (eb *EventHandler) Push(e interface{}) { - for _, listener := range eb.listeners { - listener(e) - } -} diff --git a/worker.go b/worker.go deleted file mode 100755 index 07c1e5c8..00000000 --- a/worker.go +++ /dev/null @@ -1,370 +0,0 @@ -package roadrunner - -import ( - "bytes" - "context" - "fmt" - "io" - "os" - "os/exec" - "strconv" - "strings" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/util" - - "github.com/spiral/goridge/v3" - "go.uber.org/multierr" -) - -const ( - // WaitDuration - for how long error buffer should attempt to aggregate error messages - // before merging output together since lastError update (required to keep error update together). - WaitDuration = 25 * time.Millisecond - - // ReadBufSize used to make a slice with specified length to read from stderr - ReadBufSize = 10240 // Kb -) - -// EventWorkerKill thrown after WorkerProcess is being forcefully killed. -const ( - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError Event = iota + 200 - - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog -) - -type Event int64 - -func (ev Event) String() string { - switch ev { - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - } - return "Unknown event type" -} - -// WorkerEvent wraps worker events. -type WorkerEvent struct { - // Event id, see below. - Event Event - - // Worker triggered the event. - Worker WorkerBase - - // Event specific payload. - Payload interface{} -} - -var pool = sync.Pool{ - New: func() interface{} { - buf := make([]byte, ReadBufSize) - return &buf - }, -} - -type WorkerBase interface { - fmt.Stringer - - // Pid returns worker pid. - Pid() int64 - - // Created returns time worker was created at. - Created() time.Time - - // AddListener attaches listener to consume worker events. - AddListener(listener util.EventListener) - - // State return receive-only WorkerProcess state object, state can be used to safely access - // WorkerProcess status, time when status changed and number of WorkerProcess executions. - State() State - - // Start used to run Cmd and immediately return - Start() error - - // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is - // complete and will return process error (if any), if stderr is presented it's value - // will be wrapped as WorkerError. Method will return error code if php process fails - // to find or Start the script. - Wait() error - - // Stop sends soft termination command to the WorkerProcess and waits for process completion. - Stop(ctx context.Context) error - - // Kill kills underlying process, make sure to call Wait() func to gather - // error log from the stderr. Does not waits for process completion! - Kill() error - - // Relay returns attached to worker goridge relay - Relay() goridge.Relay - - // AttachRelay used to attach goridge relay to the worker process - AttachRelay(rl goridge.Relay) -} - -// WorkerProcess - supervised process with api over goridge.Relay. -type WorkerProcess struct { - // created indicates at what time WorkerProcess has been created. - created time.Time - - // updates parent supervisor or pool about WorkerProcess events - events util.EventsHandler - - // state holds information about current WorkerProcess state, - // number of WorkerProcess executions, buf status change time. - // publicly this object is receive-only and protected using Mutex - // and atomic counter. - state *state - - // underlying command with associated process, command must be - // provided to WorkerProcess from outside in non-started form. CmdSource - // stdErr direction will be handled by WorkerProcess to aggregate error message. - cmd *exec.Cmd - - // pid of the process, points to pid of underlying process and - // can be nil while process is not started. - pid int - - // stderr aggregates stderr output from underlying process. Value can be - // receive only once command is completed and all pipes are closed. - stderr *bytes.Buffer - - // channel is being closed once command is complete. - // waitDone chan interface{} - - // contains information about resulted process state. - endState *os.ProcessState - - // ensures than only one execution can be run at once. - mu sync.RWMutex - - // communication bus with underlying process. - relay goridge.Relay - rd io.Reader - stop chan struct{} -} - -// InitBaseWorker creates new WorkerProcess over given exec.cmd. -func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { - if cmd.Process != nil { - return nil, fmt.Errorf("can't attach to running process") - } - w := &WorkerProcess{ - created: time.Now(), - events: &util.EventHandler{}, - cmd: cmd, - state: newState(StateInactive), - stderr: new(bytes.Buffer), - stop: make(chan struct{}, 1), - } - - w.rd, w.cmd.Stderr = io.Pipe() - - // small buffer optimization - // at this point we know, that stderr will contain huge messages - w.stderr.Grow(ReadBufSize) - - go func() { - w.watch() - }() - - return w, nil -} - -// Pid returns worker pid. -func (w *WorkerProcess) Pid() int64 { - return int64(w.pid) -} - -// Created returns time worker was created at. -func (w *WorkerProcess) Created() time.Time { - return w.created -} - -// AddListener registers new worker event listener. -func (w *WorkerProcess) AddListener(listener util.EventListener) { - w.events.AddListener(listener) -} - -// State return receive-only WorkerProcess state object, state can be used to safely access -// WorkerProcess status, time when status changed and number of WorkerProcess executions. -func (w *WorkerProcess) State() State { - return w.state -} - -// State return receive-only WorkerProcess state object, state can be used to safely access -// WorkerProcess status, time when status changed and number of WorkerProcess executions. -func (w *WorkerProcess) AttachRelay(rl goridge.Relay) { - w.relay = rl -} - -// State return receive-only WorkerProcess state object, state can be used to safely access -// WorkerProcess status, time when status changed and number of WorkerProcess executions. -func (w *WorkerProcess) Relay() goridge.Relay { - return w.relay -} - -// String returns WorkerProcess description. fmt.Stringer interface -func (w *WorkerProcess) String() string { - st := w.state.String() - // we can safely compare pid to 0 - if w.pid != 0 { - st = st + ", pid:" + strconv.Itoa(w.pid) - } - - return fmt.Sprintf( - "(`%s` [%s], numExecs: %v)", - strings.Join(w.cmd.Args, " "), - st, - w.state.NumExecs(), - ) -} - -func (w *WorkerProcess) Start() error { - err := w.cmd.Start() - if err != nil { - return err - } - w.pid = w.cmd.Process.Pid - return nil -} - -// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is -// complete and will return process error (if any), if stderr is presented it's value -// will be wrapped as WorkerError. Method will return error code if php process fails -// to find or Start the script. -func (w *WorkerProcess) Wait() error { - const op = errors.Op("worker process wait") - err := multierr.Combine(w.cmd.Wait()) - - // at this point according to the documentation (see cmd.Wait comment) - // if worker finishes with an error, message will be written to the stderr first - // and then w.cmd.Wait return an error - w.endState = w.cmd.ProcessState - if err != nil { - w.state.Set(StateErrored) - - w.mu.RLock() - // if process return code > 0, here will be an error from stderr (if presents) - if w.stderr.Len() > 0 { - err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) - // stop the stderr buffer - w.stop <- struct{}{} - } - w.mu.RUnlock() - - return multierr.Append(err, w.closeRelay()) - } - - err = multierr.Append(err, w.closeRelay()) - if err != nil { - w.state.Set(StateErrored) - return err - } - - if w.endState.Success() { - w.state.Set(StateStopped) - } - - return nil -} - -func (w *WorkerProcess) closeRelay() error { - if w.relay != nil { - err := w.relay.Close() - if err != nil { - return err - } - } - return nil -} - -// Stop sends soft termination command to the WorkerProcess and waits for process completion. -func (w *WorkerProcess) Stop(ctx context.Context) error { - c := make(chan error) - - go func() { - var err error - w.state.Set(StateStopping) - err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true})) - if err != nil { - w.state.Set(StateKilling) - c <- multierr.Append(err, w.cmd.Process.Kill()) - } - w.state.Set(StateStopped) - c <- nil - }() - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-c: - if err != nil { - return err - } - return nil - } -} - -// Kill kills underlying process, make sure to call Wait() func to gather -// error log from the stderr. Does not waits for process completion! -func (w *WorkerProcess) Kill() error { - w.state.Set(StateKilling) - err := w.cmd.Process.Signal(os.Kill) - if err != nil { - return err - } - w.state.Set(StateStopped) - return nil -} - -// put the pointer, to not allocate new slice -// but erase it len and then return back -func (w *WorkerProcess) put(data *[]byte) { - *data = (*data)[:0] - *data = (*data)[:cap(*data)] - - pool.Put(data) -} - -// get pointer to the byte slice -func (w *WorkerProcess) get() *[]byte { - return pool.Get().(*[]byte) -} - -// Write appends the contents of pool to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of pool; errBuffer is always nil. -func (w *WorkerProcess) watch() { - go func() { - for { - select { - case <-w.stop: - buf := w.get() - // read the last data - n, _ := w.rd.Read(*buf) - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) - w.mu.Lock() - // write new message - w.stderr.Write((*buf)[:n]) - w.mu.Unlock() - w.put(buf) - return - default: - // read the max 10kb of stderr per one read - buf := w.get() - n, _ := w.rd.Read(*buf) - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) - w.mu.Lock() - // write new message - w.stderr.Write((*buf)[:n]) - w.mu.Unlock() - w.put(buf) - } - } - }() -} diff --git a/worker_test.go b/worker_test.go deleted file mode 100755 index e82d383e..00000000 --- a/worker_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package roadrunner - -import ( - "context" - "os/exec" - "sync" - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_GetState(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - assert.Equal(t, StateStopped, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, StateReady, w.State().Value()) - err = w.Stop(ctx) - if err != nil { - t.Errorf("error stopping the WorkerProcess: error %v", err) - } -} - -func Test_Kill(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.Error(t, w.Wait()) - // TODO changed from stopped, discuss - assert.Equal(t, StateErrored, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, StateReady, w.State().Value()) - err = w.Kill() - if err != nil { - t.Errorf("error killing the WorkerProcess: error %v", err) - } - wg.Wait() -} - -func Test_OnStarted(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "broken", "pipes") - assert.Nil(t, cmd.Start()) - - w, err := InitBaseWorker(cmd) - assert.Nil(t, w) - assert.NotNil(t, err) - - assert.Equal(t, "can't attach to running process", err.Error()) -} |