summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-17 12:13:55 +0300
committerGitHub <[email protected]>2020-12-17 12:13:55 +0300
commitee0cb478c74c393a35155c2bf51e1ef260e0e5e2 (patch)
tree2c99d4c6e2b2e9e3fa155d5d68a9d471c9aeeb9b
parenta1dc59cabb6e63eab232922f4eb5a19dbd168f44 (diff)
parentedf924b37bcdad14eb31014c571ab58720aa178f (diff)
Merge pull request #452 from spiral/refactor/splitv2.0.0-alpha23
Refactor/split
-rw-r--r--.github/workflows/build.yml14
-rwxr-xr-xMakefile6
-rwxr-xr-xgo.sum1
-rw-r--r--interfaces/events/handler.go10
-rw-r--r--interfaces/events/pool_events.go65
-rw-r--r--interfaces/events/worker_events.go34
-rwxr-xr-xinterfaces/factory/factory.go22
-rw-r--r--interfaces/informer/interface.go6
-rw-r--r--[-rwxr-xr-x]interfaces/pool/pool.go (renamed from pool.go)60
-rw-r--r--interfaces/server/interface.go8
-rw-r--r--[-rwxr-xr-x]interfaces/worker/factory.go (renamed from factory.go)10
-rw-r--r--interfaces/worker/watcher.go26
-rw-r--r--interfaces/worker/worker.go62
-rwxr-xr-xinternal/payload.go (renamed from payload.go)2
-rwxr-xr-xinternal/protocol.go (renamed from protocol.go)10
-rwxr-xr-xinternal/state.go (renamed from state.go)37
-rwxr-xr-xinternal/state_test.go27
-rwxr-xr-xpipe_factory_test.go238
-rwxr-xr-xpkg/events/events.go38
-rwxr-xr-xpkg/pipe/pipe_factory.go (renamed from pipe_factory.go)35
-rwxr-xr-xpkg/pipe/pipe_factory_test.go511
-rw-r--r--pkg/pool/config.go75
-rwxr-xr-xpkg/pool/static_pool.go (renamed from static_pool.go)119
-rwxr-xr-xpkg/pool/static_pool_test.go (renamed from static_pool_test.go)161
-rwxr-xr-xpkg/pool/supervisor_pool.go (renamed from supervisor_pool.go)70
-rw-r--r--pkg/pool/supervisor_test.go (renamed from supervisor_test.go)31
-rwxr-xr-xpkg/socket/socket_factory.go (renamed from socket_factory.go)47
-rwxr-xr-xpkg/socket/socket_factory_test.go (renamed from socket_factory_test.go)82
-rwxr-xr-xpkg/worker/sync_worker.go (renamed from sync_worker.go)85
-rwxr-xr-xpkg/worker/sync_worker_test.go37
-rwxr-xr-xpkg/worker/worker.go302
-rwxr-xr-xpkg/worker/worker_test.go19
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go (renamed from worker_watcher.go)91
-rw-r--r--plugins/http/config.go6
-rw-r--r--plugins/http/handler.go18
-rw-r--r--plugins/http/plugin.go29
-rw-r--r--plugins/http/request.go10
-rw-r--r--plugins/http/response.go4
-rw-r--r--plugins/http/tests/configs/.rr-http.yaml13
-rw-r--r--plugins/http/tests/handler_test.go165
-rw-r--r--plugins/http/tests/http_test.go5
-rw-r--r--plugins/http/tests/response_test.go16
-rw-r--r--plugins/http/tests/uploads_test.go27
-rw-r--r--plugins/informer/plugin.go6
-rw-r--r--plugins/informer/tests/test_plugin.go9
-rw-r--r--plugins/reload/tests/reload_plugin_test.go4
-rw-r--r--plugins/resetter/tests/test_plugin.go6
-rw-r--r--plugins/server/plugin.go33
-rw-r--r--plugins/server/tests/plugin_pipes.go15
-rw-r--r--plugins/server/tests/plugin_sockets.go10
-rw-r--r--plugins/server/tests/plugin_tcp.go10
-rw-r--r--[-rwxr-xr-x]process.go (renamed from process_state.go)18
-rwxr-xr-xstate_test.go27
-rwxr-xr-xsync_worker_test.go263
-rwxr-xr-xutil/events.go36
-rwxr-xr-xworker.go370
-rwxr-xr-xworker_test.go66
57 files changed, 1850 insertions, 1657 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index f69d672a..04545212 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -14,7 +14,7 @@ jobs:
golang:
name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
runs-on: ${{ matrix.os }}
- timeout-minutes: 20
+ timeout-minutes: 25
strategy:
fail-fast: false
matrix:
@@ -65,7 +65,11 @@ jobs:
- name: Run golang tests on Windows without codecov
if: ${{ matrix.os == 'windows-latest' }}
run: |
- go test -v -race -cover -tags=debug .
+ go test -v -race -cover -tags=debug ./pkg/pipe
+ go test -v -race -cover -tags=debug ./pkg/pool
+ go test -v -race -cover -tags=debug ./pkg/socket
+ go test -v -race -cover -tags=debug ./pkg/worker
+ go test -v -race -cover -tags=debug ./pkg/worker_watcher
go test -v -race -cover -tags=debug ./plugins/rpc
go test -v -race -cover -tags=debug ./plugins/rpc/tests
go test -v -race -cover -tags=debug ./plugins/config/tests
@@ -86,7 +90,11 @@ jobs:
if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }}
run: |
mkdir ./coverage-ci
- go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/lib.txt -covermode=atomic .
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/socket
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/worker_watcher.txt -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/rpc_config.txt -covermode=atomic ./plugins/rpc
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./plugins/rpc/tests
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/plugin_config.txt -covermode=atomic ./plugins/config/tests
diff --git a/Makefile b/Makefile
index 487af5c8..9f86fa4a 100755
--- a/Makefile
+++ b/Makefile
@@ -24,7 +24,11 @@ uninstall: ## Uninstall locally installed RR
rm -f /usr/local/bin/rr
test: ## Run application tests
- go test -v -race -cover -tags=debug -covermode=atomic .
+ go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pipe
+ go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pool
+ go test -v -race -cover -tags=debug -covermode=atomic ./pkg/socket
+ go test -v -race -cover -tags=debug -covermode=atomic ./pkg/worker
+ go test -v -race -cover -tags=debug -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc/tests
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/config/tests
diff --git a/go.sum b/go.sum
index 90cc58fb..83876b77 100755
--- a/go.sum
+++ b/go.sum
@@ -384,6 +384,7 @@ github.com/spiral/errors v1.0.6 h1:berk5ShEILSw6DplUVv9Ea1wGdk2WlVKQpuvDngll0U=
github.com/spiral/errors v1.0.6/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/goridge/v3 v3.0.0-beta7 h1:rJmfVFC/clN7XgsONcu185l36cPJ+MfcFkQSifQXFCM=
github.com/spiral/goridge/v3 v3.0.0-beta7/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE=
+github.com/spiral/roadrunner v1.9.0 h1:hQRAqrpUCOujuuuY4dV5hQWjMhwvMnVZmK2mNON/yl4=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
diff --git a/interfaces/events/handler.go b/interfaces/events/handler.go
new file mode 100644
index 00000000..01f64d73
--- /dev/null
+++ b/interfaces/events/handler.go
@@ -0,0 +1,10 @@
+package events
+
+type Handler interface {
+ NumListeners() int
+ AddListener(listener EventListener)
+ Push(e interface{})
+}
+
+// Event listener listens for the events produced by worker, worker pool or other service.
+type EventListener func(event interface{})
diff --git a/interfaces/events/pool_events.go b/interfaces/events/pool_events.go
new file mode 100644
index 00000000..cc32f6b2
--- /dev/null
+++ b/interfaces/events/pool_events.go
@@ -0,0 +1,65 @@
+package events
+
+const (
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct P = iota + 7800
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventPoolError caused on pool wide errors.
+ EventPoolError
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // todo: EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
+ EventTTL
+
+ // todo: EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+)
+
+type P int64
+
+func (ev P) String() string {
+ switch ev {
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventPoolError:
+ return "EventPoolError"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ }
+ return "Unknown event type"
+}
+
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event P
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
diff --git a/interfaces/events/worker_events.go b/interfaces/events/worker_events.go
new file mode 100644
index 00000000..497f0a06
--- /dev/null
+++ b/interfaces/events/worker_events.go
@@ -0,0 +1,34 @@
+package events
+
+// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
+const (
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError E = iota + 200
+
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+)
+
+type E int64
+
+func (ev E) String() string {
+ switch ev {
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ }
+ return "Unknown event type"
+}
+
+// WorkerEvent wraps worker events.
+type WorkerEvent struct {
+ // Event id, see below.
+ Event E
+
+ // Worker triggered the event.
+ Worker interface{}
+
+ // Event specific payload.
+ Payload interface{}
+}
diff --git a/interfaces/factory/factory.go b/interfaces/factory/factory.go
new file mode 100755
index 00000000..51b73501
--- /dev/null
+++ b/interfaces/factory/factory.go
@@ -0,0 +1,22 @@
+package worker
+
+import (
+ "context"
+ "os/exec"
+
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+)
+
+// Factory is responsible of wrapping given command into tasks WorkerProcess.
+type Factory interface {
+ // SpawnWorkerWithContext creates new WorkerProcess process based on given command with contex.
+ // Process must not be started.
+ SpawnWorkerWithContext(context.Context, *exec.Cmd) (worker.BaseProcess, error)
+
+ // SpawnWorker creates new WorkerProcess process based on given command.
+ // Process must not be started.
+ SpawnWorker(*exec.Cmd) (worker.BaseProcess, error)
+
+ // Close the factory and underlying connections.
+ Close(ctx context.Context) error
+}
diff --git a/interfaces/informer/interface.go b/interfaces/informer/interface.go
index a8d32841..b975edd7 100644
--- a/interfaces/informer/interface.go
+++ b/interfaces/informer/interface.go
@@ -1,8 +1,10 @@
package informer
-import "github.com/spiral/roadrunner/v2"
+import (
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+)
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
- Workers() []roadrunner.WorkerBase
+ Workers() []worker.BaseProcess
}
diff --git a/pool.go b/interfaces/pool/pool.go
index 3e38c4cb..a1015fd6 100755..100644
--- a/pool.go
+++ b/interfaces/pool/pool.go
@@ -1,76 +1,40 @@
-package roadrunner
+package pool
import (
"context"
"runtime"
"time"
- "github.com/spiral/roadrunner/v2/util"
-)
-
-// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
-type PoolEvent struct {
- // Event type, see below.
- Event int64
-
- // Payload depends on event type, typically it's worker or error.
- Payload interface{}
-}
-
-const (
- // EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct = iota + 7800
-
- // EventWorkerDestruct thrown after worker destruction.
- EventWorkerDestruct
-
- // EventPoolError caused on pool wide errors.
- EventPoolError
-
- // EventSupervisorError triggered when supervisor can not complete work.
- EventSupervisorError
-
- // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
- EventNoFreeWorkers
-
- // todo: EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory
-
- // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
- EventTTL
-
- // todo: EventIdleTTL triggered when worker spends too much time at rest.
- EventIdleTTL
-
- // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
- EventExecTTL
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
)
// Pool managed set of inner worker processes.
type Pool interface {
// AddListener connects event listener to the pool.
- AddListener(listener util.EventListener)
+ AddListener(listener events.EventListener)
// GetConfig returns pool configuration.
- GetConfig() PoolConfig
+ GetConfig() interface{}
// Exec
- Exec(rqs Payload) (Payload, error)
+ Exec(rqs internal.Payload) (internal.Payload, error)
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+ ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error)
// Workers returns worker list associated with the pool.
- Workers() (workers []WorkerBase)
+ Workers() (workers []worker.BaseProcess)
// Remove worker from the pool.
- RemoveWorker(worker WorkerBase) error
+ RemoveWorker(worker worker.BaseProcess) error
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
}
// Configures the pool behaviour.
-type PoolConfig struct {
+type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool
@@ -96,7 +60,7 @@ type PoolConfig struct {
}
// InitDefaults enables default config values.
-func (cfg *PoolConfig) InitDefaults() {
+func (cfg *Config) InitDefaults() {
if cfg.NumWorkers == 0 {
cfg.NumWorkers = int64(runtime.NumCPU())
}
diff --git a/interfaces/server/interface.go b/interfaces/server/interface.go
index 2dae30c5..c50848d7 100644
--- a/interfaces/server/interface.go
+++ b/interfaces/server/interface.go
@@ -4,7 +4,9 @@ import (
"context"
"os/exec"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
)
type Env map[string]string
@@ -12,6 +14,6 @@ type Env map[string]string
// Server creates workers for the application.
type Server interface {
CmdFactory(env Env) (func() *exec.Cmd, error)
- NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
- NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env Env) (roadrunner.Pool, error)
+ NewWorker(ctx context.Context, env Env) (worker.BaseProcess, error)
+ NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env) (pool.Pool, error)
}
diff --git a/factory.go b/interfaces/worker/factory.go
index 482d39f8..19e2bf5d 100755..100644
--- a/factory.go
+++ b/interfaces/worker/factory.go
@@ -1,4 +1,4 @@
-package roadrunner
+package worker
import (
"context"
@@ -7,12 +7,12 @@ import (
// Factory is responsible of wrapping given command into tasks WorkerProcess.
type Factory interface {
+ // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
+ // Process must not be started.
+ SpawnWorkerWithContext(context.Context, *exec.Cmd) (BaseProcess, error)
// SpawnWorker creates new WorkerProcess process based on given command.
// Process must not be started.
- SpawnWorkerWithContext(context.Context, *exec.Cmd) (WorkerBase, error)
-
- SpawnWorker(*exec.Cmd) (WorkerBase, error)
-
+ SpawnWorker(*exec.Cmd) (BaseProcess, error)
// Close the factory and underlying connections.
Close(ctx context.Context) error
}
diff --git a/interfaces/worker/watcher.go b/interfaces/worker/watcher.go
new file mode 100644
index 00000000..ce2c1c5a
--- /dev/null
+++ b/interfaces/worker/watcher.go
@@ -0,0 +1,26 @@
+package worker
+
+import "context"
+
+type Watcher interface {
+ // AddToWatch used to add stack to wait its state
+ AddToWatch(workers []BaseProcess) error
+
+ // GetFreeWorker provide first free worker
+ GetFreeWorker(ctx context.Context) (BaseProcess, error)
+
+ // PutWorker enqueues worker back
+ PushWorker(w BaseProcess)
+
+ // AllocateNew used to allocate new worker and put in into the WorkerWatcher
+ AllocateNew() error
+
+ // Destroy destroys the underlying stack
+ Destroy(ctx context.Context)
+
+ // WorkersList return all stack w/o removing it from internal storage
+ WorkersList() []BaseProcess
+
+ // RemoveWorker remove worker from the stack
+ RemoveWorker(wb BaseProcess) error
+}
diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go
new file mode 100644
index 00000000..edbc68d9
--- /dev/null
+++ b/interfaces/worker/worker.go
@@ -0,0 +1,62 @@
+package worker
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/spiral/goridge/v3"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/internal"
+)
+
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (BaseProcess, error)
+
+type BaseProcess interface {
+ fmt.Stringer
+
+ // Pid returns worker pid.
+ Pid() int64
+
+ // Created returns time worker was created at.
+ Created() time.Time
+
+ // AddListener attaches listener to consume worker events.
+ AddListener(listener events.EventListener)
+
+ // State return receive-only WorkerProcess state object, state can be used to safely access
+ // WorkerProcess status, time when status changed and number of WorkerProcess executions.
+ State() internal.State
+
+ // Start used to run Cmd and immediately return
+ Start() error
+
+ // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
+ // complete and will return process error (if any), if stderr is presented it's value
+ // will be wrapped as WorkerError. Method will return error code if php process fails
+ // to find or Start the script.
+ Wait() error
+
+ // Stop sends soft termination command to the WorkerProcess and waits for process completion.
+ Stop(ctx context.Context) error
+
+ // Kill kills underlying process, make sure to call Wait() func to gather
+ // error log from the stderr. Does not waits for process completion!
+ Kill() error
+
+ // Relay returns attached to worker goridge relay
+ Relay() goridge.Relay
+
+ // AttachRelay used to attach goridge relay to the worker process
+ AttachRelay(rl goridge.Relay)
+}
+
+type SyncWorker interface {
+ // BaseProcess provides basic functionality for the SyncWorker
+ BaseProcess
+ // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
+ Exec(rqs internal.Payload) (internal.Payload, error)
+ // ExecWithContext used to handle Exec with TTL
+ ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error)
+}
diff --git a/payload.go b/internal/payload.go
index 502dfadd..63983bad 100755
--- a/payload.go
+++ b/internal/payload.go
@@ -1,4 +1,4 @@
-package roadrunner
+package internal
// Payload carries binary header and body to stack and
// back to the server.
diff --git a/protocol.go b/internal/protocol.go
index ee2d8245..5aa681eb 100755
--- a/protocol.go
+++ b/internal/protocol.go
@@ -1,4 +1,4 @@
-package roadrunner
+package internal
import (
"os"
@@ -10,7 +10,7 @@ import (
var json = j.ConfigCompatibleWithStandardLibrary
-type stopCommand struct {
+type StopCommand struct {
Stop bool `json:"stop"`
}
@@ -18,7 +18,7 @@ type pidCommand struct {
Pid int `json:"pid"`
}
-func sendControl(rl goridge.Relay, v interface{}) error {
+func SendControl(rl goridge.Relay, v interface{}) error {
const op = errors.Op("send control frame")
frame := goridge.NewFrame()
frame.WriteVersion(goridge.VERSION_1)
@@ -58,9 +58,9 @@ func sendControl(rl goridge.Relay, v interface{}) error {
return nil
}
-func fetchPID(rl goridge.Relay) (int64, error) {
+func FetchPID(rl goridge.Relay) (int64, error) {
const op = errors.Op("fetchPID")
- err := sendControl(rl, pidCommand{Pid: os.Getpid()})
+ err := SendControl(rl, pidCommand{Pid: os.Getpid()})
if err != nil {
return 0, errors.E(op, err)
}
diff --git a/state.go b/internal/state.go
index 45e90f96..8f7d939b 100755
--- a/state.go
+++ b/internal/state.go
@@ -1,4 +1,4 @@
-package roadrunner
+package internal
import (
"fmt"
@@ -8,10 +8,9 @@ import (
// State represents WorkerProcess status and updated time.
type State interface {
fmt.Stringer
-
- // Value returns state value
+ // Value returns WorkerState value
Value() int64
- // Set sets the state
+ // Set sets the WorkerState
Set(value int64)
// NumJobs shows how many times WorkerProcess was invoked
NumExecs() int64
@@ -49,13 +48,13 @@ const (
// StateStopped - process has been terminated.
StateStopped
- // StateErrored - error state (can't be used).
+ // StateErrored - error WorkerState (can't be used).
StateErrored
StateRemove
)
-type state struct {
+type WorkerState struct {
value int64
numExecs int64
// to be lightweight, use UnixNano
@@ -63,12 +62,12 @@ type state struct {
}
// Thread safe
-func newState(value int64) *state {
- return &state{value: value}
+func NewWorkerState(value int64) *WorkerState {
+ return &WorkerState{value: value}
}
-// String returns current state as string.
-func (s *state) String() string {
+// String returns current WorkerState as string.
+func (s *WorkerState) String() string {
switch s.Value() {
case StateInactive:
return "inactive"
@@ -88,36 +87,36 @@ func (s *state) String() string {
}
// NumExecs returns number of registered WorkerProcess execs.
-func (s *state) NumExecs() int64 {
+func (s *WorkerState) NumExecs() int64 {
return atomic.LoadInt64(&s.numExecs)
}
-// Value state returns state value
-func (s *state) Value() int64 {
+// Value WorkerState returns WorkerState value
+func (s *WorkerState) Value() int64 {
return atomic.LoadInt64(&s.value)
}
// IsActive returns true if WorkerProcess not Inactive or Stopped
-func (s *state) IsActive() bool {
+func (s *WorkerState) IsActive() bool {
val := s.Value()
return val == StateWorking || val == StateReady
}
-// change state value (status)
-func (s *state) Set(value int64) {
+// change WorkerState value (status)
+func (s *WorkerState) Set(value int64) {
atomic.StoreInt64(&s.value, value)
}
// register new execution atomically
-func (s *state) RegisterExec() {
+func (s *WorkerState) RegisterExec() {
atomic.AddInt64(&s.numExecs, 1)
}
// Update last used time
-func (s *state) SetLastUsed(lu uint64) {
+func (s *WorkerState) SetLastUsed(lu uint64) {
atomic.StoreUint64(&s.lastUsed, lu)
}
-func (s *state) LastUsed() uint64 {
+func (s *WorkerState) LastUsed() uint64 {
return atomic.LoadUint64(&s.lastUsed)
}
diff --git a/internal/state_test.go b/internal/state_test.go
new file mode 100755
index 00000000..bdb05825
--- /dev/null
+++ b/internal/state_test.go
@@ -0,0 +1,27 @@
+package internal
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_NewState(t *testing.T) {
+ st := NewWorkerState(StateErrored)
+
+ assert.Equal(t, "errored", st.String())
+
+ assert.Equal(t, "inactive", NewWorkerState(StateInactive).String())
+ assert.Equal(t, "ready", NewWorkerState(StateReady).String())
+ assert.Equal(t, "working", NewWorkerState(StateWorking).String())
+ assert.Equal(t, "stopped", NewWorkerState(StateStopped).String())
+ assert.Equal(t, "undefined", NewWorkerState(1000).String())
+}
+
+func Test_IsActive(t *testing.T) {
+ assert.False(t, NewWorkerState(StateInactive).IsActive())
+ assert.True(t, NewWorkerState(StateReady).IsActive())
+ assert.True(t, NewWorkerState(StateWorking).IsActive())
+ assert.False(t, NewWorkerState(StateStopped).IsActive())
+ assert.False(t, NewWorkerState(StateErrored).IsActive())
+}
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
deleted file mode 100755
index c742f522..00000000
--- a/pipe_factory_test.go
+++ /dev/null
@@ -1,238 +0,0 @@
-package roadrunner
-
-import (
- "context"
- "os/exec"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_Pipe_Start(t *testing.T) {
- ctx := context.Background()
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- assert.NoError(t, w.Stop(ctx))
-}
-
-func Test_Pipe_StartError(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- err := cmd.Start()
- if err != nil {
- t.Errorf("error running the command: error %v", err)
- }
-
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_PipeError(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- _, err := cmd.StdinPipe()
- if err != nil {
- t.Errorf("error creating the STDIN pipe: error %v", err)
- }
-
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_PipeError2(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- _, err := cmd.StdinPipe()
- if err != nil {
- t.Errorf("error creating the STDIN pipe: error %v", err)
- }
-
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_Failboot(t *testing.T) {
- cmd := exec.Command("php", "tests/failboot.php")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
-
- assert.Nil(t, w)
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
-}
-
-func Test_Pipe_Invalid(t *testing.T) {
- cmd := exec.Command("php", "tests/invalid.php")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_Echo(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- err = w.Stop(ctx)
- if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
- }
- }()
-
- sw, err := NewSyncWorker(w)
- if err != nil {
- t.Fatal(err)
- }
-
- res, err := sw.Exec(Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_Pipe_Broken(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop(ctx)
- assert.Error(t, err)
- }()
-
- sw, err := NewSyncWorker(w)
- if err != nil {
- t.Fatal(err)
- }
-
- res, err := sw.Exec(Payload{Body: []byte("hello")})
-
- assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
-}
-
-func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
- f := NewPipeFactory()
- for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- w, _ := f.SpawnWorkerWithContext(context.Background(), cmd)
- go func() {
- if w.Wait() != nil {
- b.Fail()
- }
- }()
-
- err := w.Stop(context.Background())
- if err != nil {
- b.Errorf("error stopping the worker: error %v", err)
- }
- }
-}
-
-func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd)
- sw, err := NewSyncWorker(w)
- if err != nil {
- b.Fatal(err)
- }
- b.ReportAllocs()
- b.ResetTimer()
- go func() {
- err := w.Wait()
- if err != nil {
- b.Errorf("error waiting the worker: error %v", err)
- }
- }()
- defer func() {
- err := w.Stop(context.Background())
- if err != nil {
- b.Errorf("error stopping the worker: error %v", err)
- }
- }()
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
-
-func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- if err != nil {
- b.Fatal(err)
- }
-
- defer func() {
- err = w.Stop(ctx)
- if err != nil {
- b.Errorf("error stopping the WorkerProcess: error %v", err)
- }
- }()
-
- sw, err := NewSyncWorker(w)
- if err != nil {
- b.Fatal(err)
- }
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
-
-func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- if err != nil {
- b.Fatal(err)
- }
-
- defer func() {
- err = w.Stop(ctx)
- if err != nil {
- b.Errorf("error stopping the WorkerProcess: error %v", err)
- }
- }()
-
- sw, err := NewSyncWorker(w)
- if err != nil {
- b.Fatal(err)
- }
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
diff --git a/pkg/events/events.go b/pkg/events/events.go
new file mode 100755
index 00000000..92dc103a
--- /dev/null
+++ b/pkg/events/events.go
@@ -0,0 +1,38 @@
+package events
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+)
+
+// EventHandler helps to broadcast events to multiple listeners.
+type EventHandler struct {
+ listeners []events.EventListener
+ sync.RWMutex
+}
+
+func NewEventsHandler() events.Handler {
+ return &EventHandler{listeners: make([]events.EventListener, 0, 2)}
+}
+
+// NumListeners returns number of event listeners.
+func (eb *EventHandler) NumListeners() int {
+ return len(eb.listeners)
+}
+
+// AddListener registers new event listener.
+func (eb *EventHandler) AddListener(listener events.EventListener) {
+ eb.Lock()
+ defer eb.Unlock()
+ eb.listeners = append(eb.listeners, listener)
+}
+
+// Push broadcast events across all event listeners.
+func (eb *EventHandler) Push(e interface{}) {
+ eb.Lock()
+ defer eb.Unlock()
+ for k := range eb.listeners {
+ eb.listeners[k](e)
+ }
+}
diff --git a/pipe_factory.go b/pkg/pipe/pipe_factory.go
index db00c989..c86d78c4 100755
--- a/pipe_factory.go
+++ b/pkg/pipe/pipe_factory.go
@@ -1,4 +1,4 @@
-package roadrunner
+package pipe
import (
"context"
@@ -6,33 +6,36 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+ workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
)
-// PipeFactory connects to stack using standard
+// Factory connects to stack using standard
// streams (STDIN, STDOUT pipes).
-type PipeFactory struct{}
+type Factory struct{}
// NewPipeFactory returns new factory instance and starts
// listening
// todo: review tests
-func NewPipeFactory() Factory {
- return &PipeFactory{}
+func NewPipeFactory() worker.Factory {
+ return &Factory{}
}
type SpawnResult struct {
- w WorkerBase
+ w worker.BaseProcess
err error
}
-// SpawnWorker creates new WorkerProcess and connects it to goridge relay,
+// SpawnWorker creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) {
+func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
c := make(chan SpawnResult)
const op = errors.Op("spawn worker with context")
go func() {
- w, err := InitBaseWorker(cmd)
+ w, err := workerImpl.InitBaseWorker(cmd)
if err != nil {
c <- SpawnResult{
w: nil,
@@ -76,7 +79,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
}
// errors bundle
- pid, err := fetchPID(relay)
+ pid, err := internal.FetchPID(relay)
if pid != w.Pid() || err != nil {
err = multierr.Combine(
err,
@@ -91,7 +94,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
}
// everything ok, set ready state
- w.State().Set(StateReady)
+ w.State().Set(internal.StateReady)
// return worker
c <- SpawnResult{
@@ -111,9 +114,9 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
}
}
-func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
const op = errors.Op("spawn worker")
- w, err := InitBaseWorker(cmd)
+ w, err := workerImpl.InitBaseWorker(cmd)
if err != nil {
return nil, errors.E(op, err)
}
@@ -141,7 +144,7 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
}
// errors bundle
- if pid, err := fetchPID(relay); pid != w.Pid() {
+ if pid, err := internal.FetchPID(relay); pid != w.Pid() {
err = multierr.Combine(
err,
w.Kill(),
@@ -151,11 +154,11 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
}
// everything ok, set ready state
- w.State().Set(StateReady)
+ w.State().Set(internal.StateReady)
return w, nil
}
// Close the factory.
-func (f *PipeFactory) Close(ctx context.Context) error {
+func (f *Factory) Close(ctx context.Context) error {
return nil
}
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
new file mode 100755
index 00000000..99212ff8
--- /dev/null
+++ b/pkg/pipe/pipe_factory_test.go
@@ -0,0 +1,511 @@
+package pipe
+
+import (
+ "context"
+ "os/exec"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/internal"
+ workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_GetState(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ assert.Equal(t, internal.StateStopped, w.State().Value())
+ }()
+
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ assert.Equal(t, internal.StateReady, w.State().Value())
+ err = w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+}
+
+func Test_Kill(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ assert.Error(t, w.Wait())
+ // TODO changed from stopped, discuss
+ assert.Equal(t, internal.StateErrored, w.State().Value())
+ }()
+
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ assert.Equal(t, internal.StateReady, w.State().Value())
+ err = w.Kill()
+ if err != nil {
+ t.Errorf("error killing the Process: error %v", err)
+ }
+ wg.Wait()
+}
+
+func Test_Pipe_Start(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ assert.NoError(t, w.Stop(ctx))
+}
+
+func Test_Pipe_StartError(t *testing.T) {
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ err := cmd.Start()
+ if err != nil {
+ t.Errorf("error running the command: error %v", err)
+ }
+
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_PipeError(t *testing.T) {
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ _, err := cmd.StdinPipe()
+ if err != nil {
+ t.Errorf("error creating the STDIN pipe: error %v", err)
+ }
+
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_PipeError2(t *testing.T) {
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ _, err := cmd.StdinPipe()
+ if err != nil {
+ t.Errorf("error creating the STDIN pipe: error %v", err)
+ }
+
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_Failboot(t *testing.T) {
+ cmd := exec.Command("php", "../../tests/failboot.php")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "failboot")
+}
+
+func Test_Pipe_Invalid(t *testing.T) {
+ cmd := exec.Command("php", "../../tests/invalid.php")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_Echo(t *testing.T) {
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ err = w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw, err := workerImpl.From(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Pipe_Broken(t *testing.T) {
+ cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ time.Sleep(time.Second)
+ err = w.Stop(ctx)
+ assert.Error(t, err)
+ }()
+
+ sw, err := workerImpl.From(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+}
+
+func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
+ f := NewPipeFactory()
+ for n := 0; n < b.N; n++ {
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ w, _ := f.SpawnWorkerWithContext(context.Background(), cmd)
+ go func() {
+ if w.Wait() != nil {
+ b.Fail()
+ }
+ }()
+
+ err := w.Stop(context.Background())
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd)
+ sw, err := workerImpl.From(w)
+ if err != nil {
+ b.Fatal(err)
+ }
+ b.ReportAllocs()
+ b.ResetTimer()
+ go func() {
+ err := w.Wait()
+ if err != nil {
+ b.Errorf("error waiting the worker: error %v", err)
+ }
+ }()
+ defer func() {
+ err := w.Stop(context.Background())
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ defer func() {
+ err = w.Stop(ctx)
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw, err := workerImpl.From(w)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ defer func() {
+ err = w.Stop(ctx)
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw, err := workerImpl.From(w)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Test_Echo(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ syncWorker, err := workerImpl.From(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+ go func() {
+ assert.NoError(t, syncWorker.Wait())
+ }()
+ defer func() {
+ err := syncWorker.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.Nil(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_BadPayload(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+
+ syncWorker, err := workerImpl.From(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ go func() {
+ assert.NoError(t, syncWorker.Wait())
+ }()
+ defer func() {
+ err := syncWorker.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ res, err := syncWorker.Exec(internal.Payload{})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Contains(t, err.Error(), "payload can not be empty")
+}
+
+func Test_String(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err := w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "ready")
+ assert.Contains(t, w.String(), "numExecs: 0")
+}
+
+func Test_Echo_Slow(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err := w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ syncWorker, err := workerImpl.From(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.Nil(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Broken(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ data := ""
+ mu := &sync.Mutex{}
+ w.AddListener(func(event interface{}) {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ mu.Lock()
+ data = string(wev.Payload.([]byte))
+ mu.Unlock()
+ }
+ })
+
+ syncWorker, err := workerImpl.From(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ assert.NotNil(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ time.Sleep(time.Second * 3)
+ mu.Lock()
+ if strings.ContainsAny(data, "undefined_function()") == false {
+ t.Fail()
+ }
+ mu.Unlock()
+ assert.Error(t, w.Stop(ctx))
+}
+
+func Test_Error(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ defer func() {
+ err := w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ syncWorker, err := workerImpl.From(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ assert.NotNil(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ if errors.Is(errors.ErrSoftJob, err) == false {
+ t.Fatal("error should be of type errors.ErrSoftJob")
+ }
+ assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello")
+}
+
+func Test_NumExecs(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err := w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ syncWorker, err := workerImpl.From(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, int64(1), w.State().NumExecs())
+
+ _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, int64(2), w.State().NumExecs())
+
+ _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, int64(3), w.State().NumExecs())
+}
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
new file mode 100644
index 00000000..3dcc3584
--- /dev/null
+++ b/pkg/pool/config.go
@@ -0,0 +1,75 @@
+package pool
+
+import (
+ "runtime"
+ "time"
+)
+
+// Configures the pool behaviour.
+type Config struct {
+ // Debug flag creates new fresh worker before every request.
+ Debug bool
+
+ // NumWorkers defines how many sub-processes can be run at once. This value
+ // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
+ NumWorkers int64
+
+ // MaxJobs defines how many executions is allowed for the worker until
+ // it's destruction. set 1 to create new process for each new task, 0 to let
+ // worker handle as many tasks as it can.
+ MaxJobs int64
+
+ // AllocateTimeout defines for how long pool will be waiting for a worker to
+ // be freed to handle the task. Defaults to 60s.
+ AllocateTimeout time.Duration
+
+ // DestroyTimeout defines for how long pool should be waiting for worker to
+ // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
+ DestroyTimeout time.Duration
+
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor *SupervisorConfig
+}
+
+// InitDefaults enables default config values.
+func (cfg *Config) InitDefaults() {
+ if cfg.NumWorkers == 0 {
+ cfg.NumWorkers = int64(runtime.NumCPU())
+ }
+
+ if cfg.AllocateTimeout == 0 {
+ cfg.AllocateTimeout = time.Minute
+ }
+
+ if cfg.DestroyTimeout == 0 {
+ cfg.DestroyTimeout = time.Minute
+ }
+ if cfg.Supervisor == nil {
+ return
+ }
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick uint64
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL uint64
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL uint64
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL uint64
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64
+}
+
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = 1
+ }
+}
diff --git a/static_pool.go b/pkg/pool/static_pool.go
index fbb2e5e8..6cc42143 100755
--- a/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -1,11 +1,17 @@
-package roadrunner
+package pool
import (
"context"
"os/exec"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/util"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+ events2 "github.com/spiral/roadrunner/v2/pkg/events"
+ syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
+ workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
)
// StopRequest can be sent by worker to indicate that restart is required.
@@ -13,38 +19,35 @@ const StopRequest = "{\"stop\":true}"
var bCtx = context.Background()
-// Allocator is responsible for worker allocation in the pool
-type Allocator func() (WorkerBase, error)
-
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w WorkerBase) (Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error)
-// PoolBefore is set of functions that executes BEFORE Exec
-type Before func(req Payload) Payload
+// Before is set of functions that executes BEFORE Exec
+type Before func(req internal.Payload) internal.Payload
-// PoolAfter is set of functions that executes AFTER Exec
-type After func(req Payload, resp Payload) Payload
+// After is set of functions that executes AFTER Exec
+type After func(req internal.Payload, resp internal.Payload) internal.Payload
-type PoolOptions func(p *StaticPool)
+type Options func(p *StaticPool)
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
- cfg PoolConfig
+ cfg Config
// worker command creator
cmd func() *exec.Cmd
// creates and connects to stack
- factory Factory
+ factory worker.Factory
// distributes the events
- events util.EventsHandler
+ events events.Handler
// manages worker states and TTLs
- ww WorkerWatcher
+ ww worker.Watcher
// allocate new worker
- allocator Allocator
+ allocator worker.Allocator
errEncoder ErrorEncoder
before []Before
@@ -52,7 +55,7 @@ type StaticPool struct {
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg PoolConfig, options ...PoolOptions) (Pool, error) {
+func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) {
const op = errors.Op("NewPool")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
@@ -68,13 +71,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Poo
cfg: cfg,
cmd: cmd,
factory: factory,
- events: util.NewEventsHandler(),
+ events: events2.NewEventsHandler(),
after: make([]After, 0, 0),
before: make([]Before, 0, 0),
}
p.allocator = newPoolAllocator(factory, cmd)
- p.ww = newWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
if err != nil {
@@ -105,38 +108,38 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Poo
return p, nil
}
-func PoolBefore(before ...Before) PoolOptions {
+func ExecBefore(before ...Before) Options {
return func(p *StaticPool) {
p.before = append(p.before, before...)
}
}
-func PoolAfter(after ...After) PoolOptions {
+func ExecAfter(after ...After) Options {
return func(p *StaticPool) {
p.after = append(p.after, after...)
}
}
// AddListener connects event listener to the pool.
-func (sp *StaticPool) AddListener(listener util.EventListener) {
+func (sp *StaticPool) AddListener(listener events.EventListener) {
sp.events.AddListener(listener)
}
-// PoolConfig returns associated pool configuration. Immutable.
-func (sp *StaticPool) GetConfig() PoolConfig {
+// Config returns associated pool configuration. Immutable.
+func (sp *StaticPool) GetConfig() interface{} {
return sp.cfg
}
// Workers returns worker list associated with the pool.
-func (sp *StaticPool) Workers() (workers []WorkerBase) {
+func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
return sp.ww.WorkersList()
}
-func (sp *StaticPool) RemoveWorker(wb WorkerBase) error {
+func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return sp.ww.RemoveWorker(wb)
}
-func (sp *StaticPool) Exec(p Payload) (Payload, error) {
+func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
const op = errors.Op("exec")
if sp.cfg.Debug {
return sp.execDebug(p)
@@ -145,10 +148,10 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return EmptyPayload, errors.E(op, err)
+ return internal.Payload{}, errors.E(op, err)
}
- sw := w.(SyncWorker)
+ sw := w.(worker.SyncWorker)
if len(sp.before) > 0 {
for i := 0; i < len(sp.before); i++ {
@@ -164,10 +167,10 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
// worker want's to be terminated
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
- sw.State().Set(StateInvalid)
+ sw.State().Set(internal.StateInvalid)
err = sw.Stop(bCtx)
if err != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
return sp.Exec(p)
@@ -176,7 +179,7 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- return EmptyPayload, errors.E(op, err)
+ return internal.Payload{}, errors.E(op, err)
}
} else {
sp.ww.PushWorker(sw)
@@ -191,16 +194,16 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
return rsp, nil
}
-func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
const op = errors.Op("exec with context")
ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return EmptyPayload, errors.E(op, err)
+ return internal.Payload{}, errors.E(op, err)
}
- sw := w.(SyncWorker)
+ sw := w.(worker.SyncWorker)
// apply all before function
if len(sp.before) > 0 {
@@ -216,10 +219,10 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- sw.State().Set(StateInvalid)
+ sw.State().Set(internal.StateInvalid)
err = sw.Stop(bCtx)
if err != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
@@ -228,7 +231,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- return EmptyPayload, errors.E(op, err)
+ return internal.Payload{}, errors.E(op, err)
}
} else {
sp.ww.PushWorker(sw)
@@ -244,13 +247,13 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
return rsp, nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (WorkerBase, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
// GetFreeWorker function consumes context with timeout
w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Push(PoolEvent{Event: EventNoFreeWorkers, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)})
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -265,48 +268,48 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w WorkerBase) (Payload, error) {
+ return func(err error, w worker.BaseProcess) (internal.Payload, error) {
const op = errors.Op("error encoder")
// soft job errors are allowed
if errors.Is(errors.ErrSoftJob, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- sp.events.Push(PoolEvent{Event: EventWorkerConstruct, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
}
- w.State().Set(StateInvalid)
+ w.State().Set(internal.StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
sp.ww.PushWorker(w)
}
- return EmptyPayload, errors.E(op, err)
+ return internal.Payload{}, errors.E(op, err)
}
- w.State().Set(StateInvalid)
- sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
+ w.State().Set(internal.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
if errS != nil {
- return EmptyPayload, errors.E(op, errors.Errorf("%v, %v", err, errS))
+ return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
}
- return EmptyPayload, errors.E(op, err)
+ return internal.Payload{}, errors.E(op, err)
}
}
-func newPoolAllocator(factory Factory, cmd func() *exec.Cmd) Allocator {
- return func() (WorkerBase, error) {
+func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
+ return func() (worker.BaseProcess, error) {
w, err := factory.SpawnWorkerWithContext(bCtx, cmd())
if err != nil {
return nil, err
}
- sw, err := NewSyncWorker(w)
+ sw, err := syncWorker.From(w)
if err != nil {
return nil, err
}
@@ -314,25 +317,25 @@ func newPoolAllocator(factory Factory, cmd func() *exec.Cmd) Allocator {
}
}
-func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
+func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return EmptyPayload, err
+ return internal.Payload{}, err
}
- r, err := sw.(SyncWorker).Exec(p)
+ r, err := sw.(worker.SyncWorker).Exec(p)
if stopErr := sw.Stop(context.Background()); stopErr != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
}
return r, err
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
+func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
- var workers []WorkerBase
+ var workers []worker.BaseProcess
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
diff --git a/static_pool_test.go b/pkg/pool/static_pool_test.go
index 33799c40..dd33a1a6 100755
--- a/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -1,4 +1,4 @@
-package roadrunner
+package pool
import (
"context"
@@ -12,10 +12,13 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
"github.com/stretchr/testify/assert"
)
-var cfg = PoolConfig{
+var cfg = Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -25,8 +28,8 @@ func Test_NewPool(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -39,8 +42,8 @@ func Test_NewPool(t *testing.T) {
func Test_StaticPool_Invalid(t *testing.T) {
p, err := NewPool(
context.Background(),
- func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") },
+ pipe.NewPipeFactory(),
cfg,
)
@@ -51,9 +54,9 @@ func Test_StaticPool_Invalid(t *testing.T) {
func Test_ConfigNoErrorInitDefaults(t *testing.T) {
p, err := NewPool(
context.Background(),
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
@@ -67,8 +70,8 @@ func Test_StaticPool_Echo(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -77,7 +80,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(Payload{Body: []byte("hello")})
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -91,8 +94,8 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -101,7 +104,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -115,8 +118,8 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") },
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -125,7 +128,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -139,8 +142,8 @@ func Test_StaticPool_JobError(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") },
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -148,7 +151,7 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(Payload{Body: []byte("hello")})
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -165,18 +168,18 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") },
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
assert.NotNil(t, p)
- block := make(chan struct{})
+ block := make(chan struct{}, 1)
p.AddListener(func(event interface{}) {
- if wev, ok := event.(WorkerEvent); ok {
- if wev.Event == EventWorkerLog {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ if wev.Event == events.EventWorkerLog {
e := string(wev.Payload.([]byte))
if strings.ContainsAny(e, "undefined_function()") {
block <- struct{}{}
@@ -186,7 +189,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
}
})
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, internal.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
@@ -200,8 +203,8 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -209,7 +212,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(Payload{Body: []byte("hello")})
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -223,8 +226,8 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
p.AddListener(func(event interface{}) {
- if pe, ok := event.(PoolEvent); ok {
- if pe.Event == EventWorkerConstruct {
+ if pe, ok := event.(events.PoolEvent); ok {
+ if pe.Event == events.EventWorkerConstruct {
wg.Done()
}
}
@@ -240,7 +243,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
list := p.Workers()
for _, w := range list {
- assert.Equal(t, StateReady, w.State().Value())
+ assert.Equal(t, internal.StateReady, w.State().Value())
}
wg.Wait()
}
@@ -248,9 +251,9 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
func Test_StaticPool_AllocateTimeout(t *testing.T) {
p, err := NewPool(
context.Background(),
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
DestroyTimeout: time.Second * 2,
@@ -267,9 +270,9 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
@@ -284,11 +287,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.Exec(Payload{Body: []byte("hello")})
+ res, _ := p.Exec(internal.Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(Payload{Body: []byte("hello")})
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -304,9 +307,9 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
Debug: true,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -320,14 +323,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
assert.Len(t, p.Workers(), 0)
var lastPID string
- res, _ := p.Exec(Payload{Body: []byte("hello")})
+ res, _ := p.Exec(internal.Payload{Body: []byte("hello")})
assert.NotEqual(t, lastPID, string(res.Body))
assert.Len(t, p.Workers(), 0)
for i := 0; i < 10; i++ {
assert.Len(t, p.Workers(), 0)
- res, err := p.Exec(Payload{Body: []byte("hello")})
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -344,9 +347,9 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -360,14 +363,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.Exec(Payload{Body: []byte("hello")})
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(Payload{Body: []byte("hello")})
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -384,9 +387,9 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -397,7 +400,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.Exec(Payload{Body: []byte("100")})
+ _, err = p.Exec(internal.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -406,9 +409,9 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -419,7 +422,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.Exec(Payload{Body: []byte("100")})
+ _, err := p.Exec(internal.Payload{Body: []byte("100")})
if err != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -427,7 +430,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 10)
p.Destroy(ctx)
- _, err = p.Exec(Payload{Body: []byte("100")})
+ _, err = p.Exec(internal.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -436,9 +439,9 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
context.Background(),
- func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -450,10 +453,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
assert.NotNil(t, p)
for _, w := range p.Workers() {
- w.State().Set(StateErrored)
+ w.State().Set(internal.StateErrored)
}
- _, err = p.Exec(Payload{Body: []byte("hello")})
+ _, err = p.Exec(internal.Payload{Body: []byte("hello")})
assert.Error(t, err)
}
@@ -461,9 +464,9 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
func Test_Static_Pool_Slow_Destroy(t *testing.T) {
p, err := NewPool(
context.Background(),
- func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -480,8 +483,8 @@ func Benchmark_Pool_Echo(b *testing.B) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
cfg,
)
if err != nil {
@@ -491,7 +494,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -502,9 +505,9 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx := context.Background()
p, _ := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -517,7 +520,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -532,9 +535,9 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx := context.Background()
p, _ := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
- NewPipeFactory(),
- PoolConfig{
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
@@ -546,7 +549,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index dfec5559..6d1f0c58 100755
--- a/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -1,4 +1,4 @@
-package roadrunner
+package pool
import (
"context"
@@ -6,27 +6,31 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/util"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
)
const MB = 1024 * 1024
-type SupervisedPool interface {
- Pool
+type Supervised interface {
+ pool.Pool
// Start used to start watching process for all pool workers
Start()
}
-type supervisedPool struct {
+type supervised struct {
cfg *SupervisorConfig
- events util.EventsHandler
- pool Pool
+ events events.Handler
+ pool pool.Pool
stopCh chan struct{}
mu *sync.RWMutex
}
-func newPoolWatcher(pool Pool, events util.EventsHandler, cfg *SupervisorConfig) SupervisedPool {
- sp := &supervisedPool{
+func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+ sp := &supervised{
cfg: cfg,
events: events,
pool: pool,
@@ -38,10 +42,10 @@ func newPoolWatcher(pool Pool, events util.EventsHandler, cfg *SupervisorConfig)
type ttlExec struct {
err error
- p Payload
+ p internal.Payload
}
-func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
const op = errors.Op("exec_supervised")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
@@ -55,7 +59,7 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay
if err != nil {
c <- ttlExec{
err: errors.E(op, err),
- p: EmptyPayload,
+ p: internal.Payload{},
}
}
@@ -68,10 +72,10 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay
for {
select {
case <-ctx.Done():
- return EmptyPayload, errors.E(op, errors.TimeOut, ctx.Err())
+ return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
case res := <-c:
if res.err != nil {
- return EmptyPayload, res.err
+ return internal.Payload{}, res.err
}
return res.p, nil
@@ -79,38 +83,38 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay
}
}
-func (sp *supervisedPool) Exec(p Payload) (Payload, error) {
+func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) {
const op = errors.Op("supervised exec")
rsp, err := sp.pool.Exec(p)
if err != nil {
- return EmptyPayload, errors.E(op, err)
+ return internal.Payload{}, errors.E(op, err)
}
return rsp, nil
}
-func (sp *supervisedPool) AddListener(listener util.EventListener) {
+func (sp *supervised) AddListener(listener events.EventListener) {
sp.pool.AddListener(listener)
}
-func (sp *supervisedPool) GetConfig() PoolConfig {
+func (sp *supervised) GetConfig() interface{} {
return sp.pool.GetConfig()
}
-func (sp *supervisedPool) Workers() (workers []WorkerBase) {
+func (sp *supervised) Workers() (workers []worker.BaseProcess) {
sp.mu.Lock()
defer sp.mu.Unlock()
return sp.pool.Workers()
}
-func (sp *supervisedPool) RemoveWorker(worker WorkerBase) error {
+func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
return sp.pool.RemoveWorker(worker)
}
-func (sp *supervisedPool) Destroy(ctx context.Context) {
+func (sp *supervised) Destroy(ctx context.Context) {
sp.pool.Destroy(ctx)
}
-func (sp *supervisedPool) Start() {
+func (sp *supervised) Start() {
go func() {
watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick))
for {
@@ -128,11 +132,11 @@ func (sp *supervisedPool) Start() {
}()
}
-func (sp *supervisedPool) Stop() {
+func (sp *supervised) Stop() {
sp.stopCh <- struct{}{}
}
-func (sp *supervisedPool) control() {
+func (sp *supervised) control() {
now := time.Now()
const op = errors.Op("supervised pool control tick")
@@ -140,11 +144,11 @@ func (sp *supervisedPool) control() {
workers := sp.pool.Workers()
for i := 0; i < len(workers); i++ {
- if workers[i].State().Value() == StateInvalid {
+ if workers[i].State().Value() == internal.StateInvalid {
continue
}
- s, err := WorkerProcessState(workers[i])
+ s, err := roadrunner.WorkerProcessState(workers[i])
if err != nil {
// worker not longer valid for supervision
continue
@@ -153,27 +157,27 @@ func (sp *supervisedPool) control() {
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
return
}
- sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
+ sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
return
}
- sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]})
+ sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
}
// firs we check maxWorker idle
if sp.cfg.IdleTTL != 0 {
// then check for the worker state
- if workers[i].State().Value() != StateReady {
+ if workers[i].State().Value() != internal.StateReady {
continue
}
@@ -194,10 +198,10 @@ func (sp *supervisedPool) control() {
if sp.cfg.IdleTTL-uint64(res) <= 0 {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
return
}
- sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]})
+ sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
}
}
diff --git a/supervisor_test.go b/pkg/pool/supervisor_test.go
index d5d7d04c..2e3e7fd2 100644
--- a/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -1,4 +1,4 @@
-package roadrunner
+package pool
import (
"context"
@@ -6,10 +6,13 @@ import (
"testing"
"time"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
"github.com/stretchr/testify/assert"
)
-var cfgSupervised = PoolConfig{
+var cfgSupervised = Config{
NumWorkers: int64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -26,8 +29,8 @@ func TestSupervisedPool_Exec(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/memleak.php", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
+ pipe.NewPipeFactory(),
cfgSupervised,
)
@@ -44,7 +47,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
default:
workers := p.Workers()
if len(workers) > 0 {
- s, err := WorkerProcessState(workers[0])
+ s, err := roadrunner.WorkerProcessState(workers[0])
assert.NoError(t, err)
assert.NotNil(t, s)
// since this is soft limit, double max memory limit watch
@@ -58,7 +61,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 50)
- _, err = p.Exec(Payload{
+ _, err = p.Exec(internal.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -69,7 +72,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
- var cfgExecTTL = PoolConfig{
+ var cfgExecTTL = Config{
NumWorkers: int64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -84,8 +87,8 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/sleep.php", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
cfgExecTTL,
)
@@ -95,7 +98,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.ExecWithContext(context.Background(), Payload{
+ resp, err := p.ExecWithContext(context.Background(), internal.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -109,7 +112,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
- var cfgExecTTL = PoolConfig{
+ var cfgExecTTL = Config{
NumWorkers: int64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -124,8 +127,8 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
- func() *exec.Cmd { return exec.Command("php", "tests/sleep.php", "pipes") },
- NewPipeFactory(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
cfgExecTTL,
)
@@ -136,7 +139,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(Payload{
+ resp, err := p.Exec(internal.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
diff --git a/socket_factory.go b/pkg/socket/socket_factory.go
index e517c03f..f721ad66 100755
--- a/socket_factory.go
+++ b/pkg/socket/socket_factory.go
@@ -1,4 +1,4 @@
-package roadrunner
+package socket
import (
"context"
@@ -9,14 +9,17 @@ import (
"github.com/shirou/gopsutil/process"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+ workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/goridge/v3"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
)
-// SocketFactory connects to external stack using socket server.
-type SocketFactory struct {
+// Factory connects to external stack using socket server.
+type Factory struct {
// listens for incoming connections from underlying processes
ls net.Listener
@@ -32,10 +35,10 @@ type SocketFactory struct {
// todo: review
-// NewSocketServer returns SocketFactory attached to a given socket listener.
+// NewSocketServer returns Factory attached to a given socket listener.
// tout specifies for how long factory should serve for incoming relay connection
-func NewSocketServer(ls net.Listener, tout time.Duration) Factory {
- f := &SocketFactory{
+func NewSocketServer(ls net.Listener, tout time.Duration) worker.Factory {
+ f := &Factory{
ls: ls,
tout: tout,
relays: sync.Map{},
@@ -53,7 +56,7 @@ func NewSocketServer(ls net.Listener, tout time.Duration) Factory {
}
// blocking operation, returns an error
-func (f *SocketFactory) listen() error {
+func (f *Factory) listen() error {
errGr := &errgroup.Group{}
errGr.Go(func() error {
for {
@@ -63,7 +66,7 @@ func (f *SocketFactory) listen() error {
}
rl := goridge.NewSocketRelay(conn)
- pid, err := fetchPID(rl)
+ pid, err := internal.FetchPID(rl)
if err != nil {
return err
}
@@ -76,18 +79,18 @@ func (f *SocketFactory) listen() error {
}
type socketSpawn struct {
- w WorkerBase
+ w worker.BaseProcess
err error
}
-// SpawnWorker creates WorkerProcess and connects it to appropriate relay or returns error
-func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) {
+// SpawnWorker creates Process and connects it to appropriate relay or returns error
+func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker_with_context")
c := make(chan socketSpawn)
go func() {
ctx, cancel := context.WithTimeout(ctx, f.tout)
defer cancel()
- w, err := InitBaseWorker(cmd)
+ w, err := workerImpl.InitBaseWorker(cmd)
if err != nil {
c <- socketSpawn{
w: nil,
@@ -121,7 +124,7 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm
}
w.AttachRelay(rl)
- w.State().Set(StateReady)
+ w.State().Set(internal.StateReady)
c <- socketSpawn{
w: w,
@@ -141,9 +144,9 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm
}
}
-func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker")
- w, err := InitBaseWorker(cmd)
+ w, err := workerImpl.InitBaseWorker(cmd)
if err != nil {
return nil, err
}
@@ -164,18 +167,18 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
}
w.AttachRelay(rl)
- w.State().Set(StateReady)
+ w.State().Set(internal.StateReady)
return w, nil
}
// Close socket factory and underlying socket connection.
-func (f *SocketFactory) Close(ctx context.Context) error {
+func (f *Factory) Close(ctx context.Context) error {
return f.ls.Close()
}
-// waits for WorkerProcess to connect over socket and returns associated relay of timeout
-func (f *SocketFactory) findRelayWithContext(ctx context.Context, w WorkerBase) (*goridge.SocketRelay, error) {
+// waits for Process to connect over socket and returns associated relay of timeout
+func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*goridge.SocketRelay, error) {
ticker := time.NewTicker(time.Millisecond * 100)
for {
select {
@@ -196,7 +199,7 @@ func (f *SocketFactory) findRelayWithContext(ctx context.Context, w WorkerBase)
}
}
-func (f *SocketFactory) findRelay(w WorkerBase) (*goridge.SocketRelay, error) {
+func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) {
const op = errors.Op("find_relay")
// poll every 1ms for the relay
pollDone := time.NewTimer(f.tout)
@@ -215,11 +218,11 @@ func (f *SocketFactory) findRelay(w WorkerBase) (*goridge.SocketRelay, error) {
}
// chan to store relay associated with specific pid
-func (f *SocketFactory) attachRelayToPid(pid int64, relay goridge.Relay) {
+func (f *Factory) attachRelayToPid(pid int64, relay goridge.Relay) {
f.relays.Store(pid, relay)
}
// deletes relay chan associated with specific pid
-func (f *SocketFactory) removeRelayFromPid(pid int64) {
+func (f *Factory) removeRelayFromPid(pid int64) {
f.relays.Delete(pid)
}
diff --git a/socket_factory_test.go b/pkg/socket/socket_factory_test.go
index bbe8cc31..f1a7d637 100755
--- a/socket_factory_test.go
+++ b/pkg/socket/socket_factory_test.go
@@ -1,4 +1,4 @@
-package roadrunner
+package socket
import (
"context"
@@ -8,6 +8,8 @@ import (
"testing"
"time"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -27,7 +29,7 @@ func Test_Tcp_Start(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
assert.NoError(t, err)
@@ -39,7 +41,7 @@ func Test_Tcp_Start(t *testing.T) {
err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
+ t.Errorf("error stopping the Process: error %v", err)
}
}
@@ -52,7 +54,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
f := NewSocketServer(ls, time.Minute)
defer func() {
@@ -68,7 +70,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
+ t.Errorf("error stopping the Process: error %v", err)
}
}
@@ -87,7 +89,7 @@ func Test_Tcp_StartError(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
err = cmd.Start()
if err != nil {
t.Errorf("error executing the command: error %v", err)
@@ -114,7 +116,7 @@ func Test_Tcp_Failboot(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/failboot.php")
+ cmd := exec.Command("php", "../../tests/failboot.php")
w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
@@ -137,7 +139,7 @@ func Test_Tcp_Timeout(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/slow-client.php", "echo", "tcp", "200", "0")
+ cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0")
w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
@@ -160,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/invalid.php")
+ cmd := exec.Command("php", "../../tests/invalid.php")
w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
@@ -182,7 +184,7 @@ func Test_Tcp_Broken(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "broken", "tcp")
+ cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
if err != nil {
@@ -204,12 +206,12 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, err2)
}()
- sw, err := NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
- res, err := sw.Exec(Payload{Body: []byte("hello")})
+ res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -231,7 +233,7 @@ func Test_Tcp_Echo(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
go func() {
@@ -240,16 +242,16 @@ func Test_Tcp_Echo(t *testing.T) {
defer func() {
err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
+ t.Errorf("error stopping the Process: error %v", err)
}
}()
- sw, err := NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
- res, err := sw.Exec(Payload{Body: []byte("hello")})
+ res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -273,7 +275,7 @@ func Test_Unix_Start(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
assert.NoError(t, err)
@@ -285,7 +287,7 @@ func Test_Unix_Start(t *testing.T) {
err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
+ t.Errorf("error stopping the Process: error %v", err)
}
}
@@ -303,7 +305,7 @@ func Test_Unix_Failboot(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/failboot.php")
+ cmd := exec.Command("php", "../../tests/failboot.php")
w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
@@ -325,7 +327,7 @@ func Test_Unix_Timeout(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/slow-client.php", "echo", "unix", "200", "0")
+ cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0")
w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
@@ -347,7 +349,7 @@ func Test_Unix_Invalid(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/invalid.php")
+ cmd := exec.Command("php", "../../tests/invalid.php")
w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
@@ -368,7 +370,7 @@ func Test_Unix_Broken(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "broken", "unix")
+ cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
if err != nil {
@@ -389,12 +391,12 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
}()
- sw, err := NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
- res, err := sw.Exec(Payload{Body: []byte("hello")})
+ res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
@@ -416,7 +418,7 @@ func Test_Unix_Echo(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
if err != nil {
@@ -428,16 +430,16 @@ func Test_Unix_Echo(t *testing.T) {
defer func() {
err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
+ t.Errorf("error stopping the Process: error %v", err)
}
}()
- sw, err := NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
t.Fatal(err)
}
- res, err := sw.Exec(Payload{Body: []byte("hello")})
+ res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -463,7 +465,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
w, err := f.SpawnWorkerWithContext(ctx, cmd)
if err != nil {
@@ -475,7 +477,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
err = w.Stop(ctx)
if err != nil {
- b.Errorf("error stopping the WorkerProcess: error %v", err)
+ b.Errorf("error stopping the Process: error %v", err)
}
}
}
@@ -494,7 +496,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
b.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
if err != nil {
@@ -503,17 +505,17 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
defer func() {
err = w.Stop(ctx)
if err != nil {
- b.Errorf("error stopping the WorkerProcess: error %v", err)
+ b.Errorf("error stopping the Process: error %v", err)
}
}()
- sw, err := NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
b.Fatal(err)
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -535,7 +537,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
w, err := f.SpawnWorkerWithContext(ctx, cmd)
if err != nil {
@@ -543,7 +545,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
}
err = w.Stop(ctx)
if err != nil {
- b.Errorf("error stopping the WorkerProcess: error %v", err)
+ b.Errorf("error stopping the Process: error %v", err)
}
}
}
@@ -562,7 +564,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
b.Skip("socket is busy")
}
- cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
if err != nil {
@@ -571,17 +573,17 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
defer func() {
err = w.Stop(ctx)
if err != nil {
- b.Errorf("error stopping the WorkerProcess: error %v", err)
+ b.Errorf("error stopping the Process: error %v", err)
}
}()
- sw, err := NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
b.Fatal(err)
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/sync_worker.go b/pkg/worker/sync_worker.go
index 94a804a7..1eb1396e 100755
--- a/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -1,4 +1,4 @@
-package roadrunner
+package worker
import (
"bytes"
@@ -6,110 +6,101 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/util"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
"go.uber.org/multierr"
"github.com/spiral/goridge/v3"
)
-var EmptyPayload = Payload{}
-
-type SyncWorker interface {
- // WorkerBase provides basic functionality for the SyncWorker
- WorkerBase
- // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
- Exec(rqs Payload) (Payload, error)
- // ExecWithContext used to handle Exec with TTL
- ExecWithContext(ctx context.Context, p Payload) (Payload, error)
-}
-
type syncWorker struct {
- w WorkerBase
+ w worker.BaseProcess
}
-// NewSyncWorker creates SyncWorker from WorkerBasa
-func NewSyncWorker(w WorkerBase) (SyncWorker, error) {
+// From creates SyncWorker from WorkerBasa
+func From(w worker.BaseProcess) (worker.SyncWorker, error) {
return &syncWorker{
w: w,
}, nil
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) Exec(p Payload) (Payload, error) {
+func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
const op = errors.Op("sync worker Exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
- return EmptyPayload, errors.E(op, errors.Str("payload can not be empty"))
+ return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.w.State().Value() != StateReady {
- return EmptyPayload, errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()))
+ if tw.w.State().Value() != internal.StateReady {
+ return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
}
// set last used time
tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(StateWorking)
+ tw.w.State().Set(internal.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.ErrSoftJob, err) == false {
- tw.w.State().Set(StateErrored)
+ tw.w.State().Set(internal.StateErrored)
tw.w.State().RegisterExec()
}
- return EmptyPayload, err
+ return internal.Payload{}, err
}
- tw.w.State().Set(StateReady)
+ tw.w.State().Set(internal.StateReady)
tw.w.State().RegisterExec()
return rsp, nil
}
type wexec struct {
- payload Payload
+ payload internal.Payload
err error
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, error) {
+func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) {
const op = errors.Op("ExecWithContext")
c := make(chan wexec, 1)
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
- payload: EmptyPayload,
+ payload: internal.Payload{},
err: errors.E(op, errors.Str("payload can not be empty")),
}
return
}
- if tw.w.State().Value() != StateReady {
+ if tw.w.State().Value() != internal.StateReady {
c <- wexec{
- payload: EmptyPayload,
- err: errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())),
+ payload: internal.Payload{},
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())),
}
return
}
// set last used time
tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(StateWorking)
+ tw.w.State().Set(internal.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.ErrSoftJob, err) == false {
- tw.w.State().Set(StateErrored)
+ tw.w.State().Set(internal.StateErrored)
tw.w.State().RegisterExec()
}
c <- wexec{
- payload: EmptyPayload,
+ payload: internal.Payload{},
err: errors.E(op, err),
}
return
}
- tw.w.State().Set(StateReady)
+ tw.w.State().Set(internal.StateReady)
tw.w.State().RegisterExec()
c <- wexec{
@@ -122,18 +113,18 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload,
case <-ctx.Done():
err := multierr.Combine(tw.Kill())
if err != nil {
- return EmptyPayload, multierr.Append(err, ctx.Err())
+ return internal.Payload{}, multierr.Append(err, ctx.Err())
}
- return EmptyPayload, ctx.Err()
+ return internal.Payload{}, ctx.Err()
case res := <-c:
if res.err != nil {
- return EmptyPayload, res.err
+ return internal.Payload{}, res.err
}
return res.payload, nil
}
}
-func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
+func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) {
const op = errors.Op("exec payload")
frame := goridge.NewFrame()
@@ -156,35 +147,35 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
err := tw.Relay().Send(frame)
if err != nil {
- return EmptyPayload, err
+ return internal.Payload{}, err
}
frameR := goridge.NewFrame()
err = tw.w.Relay().Receive(frameR)
if err != nil {
- return EmptyPayload, errors.E(op, err)
+ return internal.Payload{}, errors.E(op, err)
}
if frameR == nil {
- return EmptyPayload, errors.E(op, errors.Str("nil frame received"))
+ return internal.Payload{}, errors.E(op, errors.Str("nil frame received"))
}
if !frameR.VerifyCRC() {
- return EmptyPayload, errors.E(op, errors.Str("failed to verify CRC"))
+ return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
}
flags := frameR.ReadFlags()
if flags&byte(goridge.ERROR) != byte(0) {
- return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
+ return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
}
options := frameR.ReadOptions()
if len(options) != 1 {
- return EmptyPayload, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
+ return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
}
- payload := Payload{}
+ payload := internal.Payload{}
payload.Context = frameR.Payload()[:options[0]]
payload.Body = frameR.Payload()[options[0]:]
@@ -203,11 +194,11 @@ func (tw *syncWorker) Created() time.Time {
return tw.w.Created()
}
-func (tw *syncWorker) AddListener(listener util.EventListener) {
+func (tw *syncWorker) AddListener(listener events.EventListener) {
tw.w.AddListener(listener)
}
-func (tw *syncWorker) State() State {
+func (tw *syncWorker) State() internal.State {
return tw.w.State()
}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
new file mode 100755
index 00000000..e224e105
--- /dev/null
+++ b/pkg/worker/sync_worker_test.go
@@ -0,0 +1,37 @@
+package worker
+
+import (
+ "os/exec"
+ "testing"
+
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_NotStarted_String(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := InitBaseWorker(cmd)
+ assert.Contains(t, w.String(), "php tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "inactive")
+ assert.Contains(t, w.String(), "numExecs: 0")
+}
+
+func Test_NotStarted_Exec(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := InitBaseWorker(cmd)
+
+ syncWorker, err := From(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Contains(t, err.Error(), "Process is not ready (inactive)")
+}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
new file mode 100755
index 00000000..35d3264e
--- /dev/null
+++ b/pkg/worker/worker.go
@@ -0,0 +1,302 @@
+package worker
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "os"
+ "os/exec"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v3"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+ events2 "github.com/spiral/roadrunner/v2/pkg/events"
+ "go.uber.org/multierr"
+)
+
+const (
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages
+ // before merging output together since lastError update (required to keep error update together).
+ WaitDuration = 25 * time.Millisecond
+
+ // ReadBufSize used to make a slice with specified length to read from stderr
+ ReadBufSize = 10240 // Kb
+)
+
+var syncPool = sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, ReadBufSize)
+ return &buf
+ },
+}
+
+// Process - supervised process with api over goridge.Relay.
+type Process struct {
+ // created indicates at what time Process has been created.
+ created time.Time
+
+ // updates parent supervisor or pool about Process events
+ events events.Handler
+
+ // state holds information about current Process state,
+ // number of Process executions, buf status change time.
+ // publicly this object is receive-only and protected using Mutex
+ // and atomic counter.
+ state *internal.WorkerState
+
+ // underlying command with associated process, command must be
+ // provided to Process from outside in non-started form. CmdSource
+ // stdErr direction will be handled by Process to aggregate error message.
+ cmd *exec.Cmd
+
+ // pid of the process, points to pid of underlying process and
+ // can be nil while process is not started.
+ pid int
+
+ // stderr aggregates stderr output from underlying process. Value can be
+ // receive only once command is completed and all pipes are closed.
+ stderr *bytes.Buffer
+
+ // channel is being closed once command is complete.
+ // waitDone chan interface{}
+
+ // contains information about resulted process state.
+ endState *os.ProcessState
+
+ // ensures than only one execution can be run at once.
+ mu sync.RWMutex
+
+ // communication bus with underlying process.
+ relay goridge.Relay
+ // rd in a second part of pipe to read from stderr
+ rd io.Reader
+ // stop signal terminates io.Pipe from reading from stderr
+ stop chan struct{}
+}
+
+// InitBaseWorker creates new Process over given exec.cmd.
+func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
+ if cmd.Process != nil {
+ return nil, fmt.Errorf("can't attach to running process")
+ }
+ w := &Process{
+ created: time.Now(),
+ events: events2.NewEventsHandler(),
+ cmd: cmd,
+ state: internal.NewWorkerState(internal.StateInactive),
+ stderr: new(bytes.Buffer),
+ stop: make(chan struct{}, 1),
+ }
+
+ w.rd, w.cmd.Stderr = io.Pipe()
+
+ // small buffer optimization
+ // at this point we know, that stderr will contain huge messages
+ w.stderr.Grow(ReadBufSize)
+
+ go func() {
+ w.watch()
+ }()
+
+ return w, nil
+}
+
+// Pid returns worker pid.
+func (w *Process) Pid() int64 {
+ return int64(w.pid)
+}
+
+// Created returns time worker was created at.
+func (w *Process) Created() time.Time {
+ return w.created
+}
+
+// AddListener registers new worker event listener.
+func (w *Process) AddListener(listener events.EventListener) {
+ w.events.AddListener(listener)
+}
+
+// State return receive-only Process state object, state can be used to safely access
+// Process status, time when status changed and number of Process executions.
+func (w *Process) State() internal.State {
+ return w.state
+}
+
+// State return receive-only Process state object, state can be used to safely access
+// Process status, time when status changed and number of Process executions.
+func (w *Process) AttachRelay(rl goridge.Relay) {
+ w.relay = rl
+}
+
+// State return receive-only Process state object, state can be used to safely access
+// Process status, time when status changed and number of Process executions.
+func (w *Process) Relay() goridge.Relay {
+ return w.relay
+}
+
+// String returns Process description. fmt.Stringer interface
+func (w *Process) String() string {
+ st := w.state.String()
+ // we can safely compare pid to 0
+ if w.pid != 0 {
+ st = st + ", pid:" + strconv.Itoa(w.pid)
+ }
+
+ return fmt.Sprintf(
+ "(`%s` [%s], numExecs: %v)",
+ strings.Join(w.cmd.Args, " "),
+ st,
+ w.state.NumExecs(),
+ )
+}
+
+func (w *Process) Start() error {
+ err := w.cmd.Start()
+ if err != nil {
+ return err
+ }
+ w.pid = w.cmd.Process.Pid
+ return nil
+}
+
+// Wait must be called once for each Process, call will be released once Process is
+// complete and will return process error (if any), if stderr is presented it's value
+// will be wrapped as WorkerError. Method will return error code if php process fails
+// to find or Start the script.
+func (w *Process) Wait() error {
+ const op = errors.Op("worker process wait")
+ err := multierr.Combine(w.cmd.Wait())
+
+ // at this point according to the documentation (see cmd.Wait comment)
+ // if worker finishes with an error, message will be written to the stderr first
+ // and then w.cmd.Wait return an error
+ w.endState = w.cmd.ProcessState
+ if err != nil {
+ w.state.Set(internal.StateErrored)
+
+ w.mu.RLock()
+ // if process return code > 0, here will be an error from stderr (if presents)
+ if w.stderr.Len() > 0 {
+ err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
+ // stop the stderr buffer
+ w.stop <- struct{}{}
+ }
+ w.mu.RUnlock()
+
+ return multierr.Append(err, w.closeRelay())
+ }
+
+ err = multierr.Append(err, w.closeRelay())
+ if err != nil {
+ w.state.Set(internal.StateErrored)
+ return err
+ }
+
+ if w.endState.Success() {
+ w.state.Set(internal.StateStopped)
+ }
+
+ return nil
+}
+
+func (w *Process) closeRelay() error {
+ if w.relay != nil {
+ err := w.relay.Close()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Stop sends soft termination command to the Process and waits for process completion.
+func (w *Process) Stop(ctx context.Context) error {
+ c := make(chan error)
+
+ go func() {
+ var err error
+ w.state.Set(internal.StateStopping)
+ err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
+ if err != nil {
+ w.state.Set(internal.StateKilling)
+ c <- multierr.Append(err, w.cmd.Process.Kill())
+ }
+ w.state.Set(internal.StateStopped)
+ c <- nil
+ }()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err := <-c:
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+}
+
+// Kill kills underlying process, make sure to call Wait() func to gather
+// error log from the stderr. Does not waits for process completion!
+func (w *Process) Kill() error {
+ w.state.Set(internal.StateKilling)
+ err := w.cmd.Process.Signal(os.Kill)
+ if err != nil {
+ return err
+ }
+ w.state.Set(internal.StateStopped)
+ return nil
+}
+
+// put the pointer, to not allocate new slice
+// but erase it len and then return back
+func (w *Process) put(data *[]byte) {
+ *data = (*data)[:0]
+ *data = (*data)[:cap(*data)]
+
+ syncPool.Put(data)
+}
+
+// get pointer to the byte slice
+func (w *Process) get() *[]byte {
+ return syncPool.Get().(*[]byte)
+}
+
+// Write appends the contents of pool to the errBuffer, growing the errBuffer as
+// needed. The return value n is the length of pool; errBuffer is always nil.
+func (w *Process) watch() {
+ go func() {
+ for {
+ select {
+ case <-w.stop:
+ buf := w.get()
+ // read the last data
+ n, _ := w.rd.Read(*buf)
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write((*buf)[:n])
+ w.mu.Unlock()
+ w.put(buf)
+ return
+ default:
+ // read the max 10kb of stderr per one read
+ buf := w.get()
+ n, _ := w.rd.Read(*buf)
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write((*buf)[:n])
+ w.mu.Unlock()
+ w.put(buf)
+ }
+ }
+ }()
+}
diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go
new file mode 100755
index 00000000..805f66b5
--- /dev/null
+++ b/pkg/worker/worker_test.go
@@ -0,0 +1,19 @@
+package worker
+
+import (
+ "os/exec"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_OnStarted(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ assert.Nil(t, cmd.Start())
+
+ w, err := InitBaseWorker(cmd)
+ assert.Nil(t, w)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, "can't attach to running process", err.Error())
+}
diff --git a/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index f8fb67a9..8788e509 100755
--- a/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -1,4 +1,4 @@
-package roadrunner
+package worker_watcher //nolint:golint,stylecheck
import (
"context"
@@ -7,11 +7,14 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/util"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+ syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
- workers []WorkerBase
+ workers []worker.BaseProcess
mutex sync.RWMutex
destroy bool
actualNumOfWorkers int64
@@ -20,7 +23,7 @@ type Stack struct {
func NewWorkersStack() *Stack {
w := runtime.NumCPU()
return &Stack{
- workers: make([]WorkerBase, 0, w),
+ workers: make([]worker.BaseProcess, 0, w),
actualNumOfWorkers: 0,
}
}
@@ -34,7 +37,7 @@ func (stack *Stack) Reset() {
// Push worker back to the stack
// If stack in destroy state, Push will provide 100ms window to unlock the mutex
-func (stack *Stack) Push(w WorkerBase) {
+func (stack *Stack) Push(w worker.BaseProcess) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
stack.actualNumOfWorkers++
@@ -47,7 +50,7 @@ func (stack *Stack) IsEmpty() bool {
return len(stack.workers) == 0
}
-func (stack *Stack) Pop() (WorkerBase, bool) {
+func (stack *Stack) Pop() (worker.BaseProcess, bool) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
@@ -84,14 +87,13 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
}
// Workers return copy of the workers in the stack
-func (stack *Stack) Workers() []WorkerBase {
+func (stack *Stack) Workers() []worker.BaseProcess {
stack.mutex.Lock()
defer stack.mutex.Unlock()
- workersCopy := make([]WorkerBase, 0, 1)
+ workersCopy := make([]worker.BaseProcess, 0, 1)
// copy
for _, v := range stack.workers {
- sw := v.(SyncWorker)
- workersCopy = append(workersCopy, sw)
+ workersCopy = append(workersCopy, v)
}
return workersCopy
@@ -126,7 +128,7 @@ func (stack *Stack) Destroy(ctx context.Context) {
stack.mutex.Lock()
for i := 0; i < len(stack.workers); i++ {
// set state for the stack in the stack (unused at the moment)
- stack.workers[i].State().Set(StateDestroyed)
+ stack.workers[i].State().Set(internal.StateDestroyed)
}
stack.mutex.Unlock()
tt.Stop()
@@ -137,31 +139,8 @@ func (stack *Stack) Destroy(ctx context.Context) {
}
}
-type WorkerWatcher interface {
- // AddToWatch used to add stack to wait its state
- AddToWatch(workers []WorkerBase) error
-
- // GetFreeWorker provide first free worker
- GetFreeWorker(ctx context.Context) (WorkerBase, error)
-
- // PutWorker enqueues worker back
- PushWorker(w WorkerBase)
-
- // AllocateNew used to allocate new worker and put in into the WorkerWatcher
- AllocateNew() error
-
- // Destroy destroys the underlying stack
- Destroy(ctx context.Context)
-
- // WorkersList return all stack w/o removing it from internal storage
- WorkersList() []WorkerBase
-
- // RemoveWorker remove worker from the stack
- RemoveWorker(wb WorkerBase) error
-}
-
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func newWorkerWatcher(allocator Allocator, numWorkers int64, events util.EventsHandler) WorkerWatcher {
+func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher {
ww := &workerWatcher{
stack: NewWorkersStack(),
allocator: allocator,
@@ -176,29 +155,29 @@ func newWorkerWatcher(allocator Allocator, numWorkers int64, events util.EventsH
type workerWatcher struct {
mutex sync.RWMutex
stack *Stack
- allocator Allocator
+ allocator worker.Allocator
initialNumWorkers int64
actualNumWorkers int64
- events util.EventsHandler
+ events events.Handler
}
-func (ww *workerWatcher) AddToWatch(workers []WorkerBase) error {
+func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- sw, err := NewSyncWorker(workers[i])
+ sw, err := syncWorker.From(workers[i])
if err != nil {
return err
}
ww.stack.Push(sw)
sw.AddListener(ww.events.Push)
- go func(swc WorkerBase) {
+ go func(swc worker.BaseProcess) {
ww.wait(swc)
}(sw)
}
return nil
}
-func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
+func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("GetFreeWorker")
// thread safe operation
w, stop := ww.stack.Pop()
@@ -208,7 +187,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
// handle worker remove state
// in this state worker is destroyed by supervisor
- if w != nil && w.State().Value() == StateRemove {
+ if w != nil && w.State().Value() == internal.StateRemove {
err := ww.RemoveWorker(w)
if err != nil {
return nil, err
@@ -250,15 +229,15 @@ func (ww *workerWatcher) AllocateNew() error {
ww.stack.mutex.Unlock()
ww.PushWorker(sw)
- ww.events.Push(PoolEvent{
- Event: EventWorkerConstruct,
+ ww.events.Push(events.PoolEvent{
+ Event: events.EventWorkerConstruct,
Payload: sw,
})
return nil
}
-func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error {
+func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
@@ -266,7 +245,7 @@ func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error {
pid := wb.Pid()
if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(StateInvalid)
+ wb.State().Set(internal.StateInvalid)
err := wb.Kill()
if err != nil {
return errors.E(op, err)
@@ -274,12 +253,12 @@ func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error {
return nil
}
- wb.State().Set(StateRemove)
+ wb.State().Set(internal.StateRemove)
return nil
}
// O(1) operation
-func (ww *workerWatcher) PushWorker(w WorkerBase) {
+func (ww *workerWatcher) PushWorker(w worker.BaseProcess) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
ww.stack.Push(w)
@@ -292,22 +271,22 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) WorkersList() []WorkerBase {
+func (ww *workerWatcher) WorkersList() []worker.BaseProcess {
return ww.stack.Workers()
}
-func (ww *workerWatcher) wait(w WorkerBase) {
+func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("process wait")
err := w.Wait()
if err != nil {
- ww.events.Push(WorkerEvent{
- Event: EventWorkerError,
+ ww.events.Push(events.WorkerEvent{
+ Event: events.EventWorkerError,
Worker: w,
Payload: errors.E(op, err),
})
}
- if w.State().Value() == StateDestroyed {
+ if w.State().Value() == internal.StateDestroyed {
// worker was manually destroyed, no need to replace
return
}
@@ -315,14 +294,14 @@ func (ww *workerWatcher) wait(w WorkerBase) {
_ = ww.stack.FindAndRemoveByPid(w.Pid())
err = ww.AllocateNew()
if err != nil {
- ww.events.Push(PoolEvent{
- Event: EventPoolError,
+ ww.events.Push(events.PoolEvent{
+ Event: events.EventPoolError,
Payload: errors.E(op, err),
})
}
}
-func (ww *workerWatcher) addToWatch(wb WorkerBase) {
+func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
go func() {
diff --git a/plugins/http/config.go b/plugins/http/config.go
index d6efe310..00d2940b 100644
--- a/plugins/http/config.go
+++ b/plugins/http/config.go
@@ -8,7 +8,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
)
type Cidrs []*net.IPNet
@@ -56,7 +56,7 @@ type Config struct {
Uploads *UploadsConfig
// Pool configures worker pool.
- Pool *roadrunner.PoolConfig
+ Pool *poolImpl.Config
// Env is environment variables passed to the http pool
Env map[string]string
@@ -141,7 +141,7 @@ func (c *Config) EnableFCGI() bool {
func (c *Config) InitDefaults() error {
if c.Pool == nil {
// default pool
- c.Pool = &roadrunner.PoolConfig{
+ c.Pool = &poolImpl.Config{
Debug: false,
NumWorkers: int64(runtime.NumCPU()),
MaxJobs: 1000,
diff --git a/plugins/http/handler.go b/plugins/http/handler.go
index 74b038ff..57590bfd 100644
--- a/plugins/http/handler.go
+++ b/plugins/http/handler.go
@@ -10,9 +10,9 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/log"
- "github.com/spiral/roadrunner/v2/util"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
)
const (
@@ -23,8 +23,10 @@ const (
EventError
)
+const MB = 1024 * 1024
+
type Handle interface {
- AddListener(l util.EventListener)
+ AddListener(l events.EventListener)
ServeHTTP(w http.ResponseWriter, r *http.Request)
}
@@ -71,17 +73,17 @@ type handler struct {
uploads UploadsConfig
trusted Cidrs
log log.Logger
- pool roadrunner.Pool
+ pool pool.Pool
mul sync.Mutex
- lsn util.EventListener
+ lsn events.EventListener
}
-func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool roadrunner.Pool) (Handle, error) {
+func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) {
if pool == nil {
return nil, errors.E(errors.Str("pool should be initialized"))
}
return &handler{
- maxRequestSize: maxReqSize * roadrunner.MB,
+ maxRequestSize: maxReqSize * MB,
uploads: uploads,
pool: pool,
trusted: trusted,
@@ -89,7 +91,7 @@ func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool ro
}
// Listen attaches handler event controller.
-func (h *handler) AddListener(l util.EventListener) {
+func (h *handler) AddListener(l events.EventListener) {
h.mul.Lock()
defer h.mul.Unlock()
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 13299da1..460263f6 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -15,10 +15,13 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/spiral/endure"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/log"
- factory "github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/interfaces/status"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
"github.com/spiral/roadrunner/v2/util"
@@ -47,17 +50,17 @@ type Plugin struct {
sync.Mutex
configurer config.Configurer
- server factory.Server
+ server server.Server
log log.Logger
cfg *Config
// middlewares to chain
mdwr middleware
- // Event listener to stdout
- listener util.EventListener
+ // WorkerEvent listener to stdout
+ listener events.EventListener
// Pool which attached to all servers
- pool roadrunner.Pool
+ pool pool.Pool
// servers RR handler
handler Handle
@@ -69,7 +72,7 @@ type Plugin struct {
}
// AddListener attaches server event controller.
-func (s *Plugin) AddListener(listener util.EventListener) {
+func (s *Plugin) AddListener(listener events.EventListener) {
// save listeners for Reset
s.listener = listener
s.pool.AddListener(listener)
@@ -77,7 +80,7 @@ func (s *Plugin) AddListener(listener util.EventListener) {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Server) error {
+func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server server.Server) error {
const op = errors.Op("http Init")
err := cfg.UnmarshalKey(PluginName, &s.cfg)
if err != nil {
@@ -97,7 +100,7 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Serv
return errors.E(op, errors.Disabled)
}
- s.pool, err = server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ s.pool, err = server.NewWorkerPool(context.Background(), poolImpl.Config{
Debug: s.cfg.Pool.Debug,
NumWorkers: s.cfg.Pool.NumWorkers,
MaxJobs: s.cfg.Pool.MaxJobs,
@@ -122,8 +125,8 @@ func (s *Plugin) logCallback(event interface{}) {
s.log.Debug("http handler response received", "elapsed", ev.Elapsed().String(), "remote address", ev.Request.RemoteAddr)
case ErrorEvent:
s.log.Error("error event received", "elapsed", ev.Elapsed().String(), "error", ev.Error)
- case roadrunner.WorkerEvent:
- s.log.Debug("worker event received", "event", ev.Event, "worker state", ev.Worker.State())
+ case events.WorkerEvent:
+ s.log.Debug("worker event received", "event", ev.Event, "worker state", ev.Worker.(worker.BaseProcess).State())
default:
fmt.Println(event)
}
@@ -284,7 +287,7 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// Server returns associated pool workers
-func (s *Plugin) Workers() []roadrunner.WorkerBase {
+func (s *Plugin) Workers() []worker.BaseProcess {
return s.pool.Workers()
}
@@ -305,7 +308,7 @@ func (s *Plugin) Reset() error {
return errors.E(op, err)
}
- s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ s.pool, err = s.server.NewWorkerPool(context.Background(), poolImpl.Config{
Debug: s.cfg.Pool.Debug,
NumWorkers: s.cfg.Pool.NumWorkers,
MaxJobs: s.cfg.Pool.MaxJobs,
diff --git a/plugins/http/request.go b/plugins/http/request.go
index 640bdec2..5df79b7d 100644
--- a/plugins/http/request.go
+++ b/plugins/http/request.go
@@ -9,8 +9,8 @@ import (
"strings"
j "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
)
@@ -136,17 +136,17 @@ func (r *Request) Close(log log.Logger) {
// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
// files prior to calling this method.
-func (r *Request) Payload() (roadrunner.Payload, error) {
- p := roadrunner.Payload{}
+func (r *Request) Payload() (internal.Payload, error) {
+ p := internal.Payload{}
var err error
if p.Context, err = json.Marshal(r); err != nil {
- return roadrunner.EmptyPayload, err
+ return internal.Payload{}, err
}
if r.Parsed {
if p.Body, err = json.Marshal(r.body); err != nil {
- return roadrunner.EmptyPayload, err
+ return internal.Payload{}, err
}
} else if r.body != nil {
p.Body = r.body.([]byte)
diff --git a/plugins/http/response.go b/plugins/http/response.go
index e3ac2756..9700a16c 100644
--- a/plugins/http/response.go
+++ b/plugins/http/response.go
@@ -6,7 +6,7 @@ import (
"strings"
"sync"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/internal"
)
// Response handles PSR7 response logic.
@@ -23,7 +23,7 @@ type Response struct {
}
// NewResponse creates new response based on given pool payload.
-func NewResponse(p roadrunner.Payload) (*Response, error) {
+func NewResponse(p internal.Payload) (*Response, error) {
r := &Response{Body: p.Body}
if err := json.Unmarshal(p.Context, r); err != nil {
return nil, err
diff --git a/plugins/http/tests/configs/.rr-http.yaml b/plugins/http/tests/configs/.rr-http.yaml
index c6868f8c..e2e361cf 100644
--- a/plugins/http/tests/configs/.rr-http.yaml
+++ b/plugins/http/tests/configs/.rr-http.yaml
@@ -24,19 +24,6 @@ http:
maxJobs: 0
allocateTimeout: 60s
destroyTimeout: 60s
-
- ssl:
- port: 8892
- redirect: false
- cert: fixtures/server.crt
- key: fixtures/server.key
- # rootCa: root.crt
- fcgi:
- address: tcp://0.0.0.0:7921
- http2:
- enabled: false
- h2c: false
- maxConcurrentStreams: 128
logs:
mode: development
level: error
diff --git a/plugins/http/tests/handler_test.go b/plugins/http/tests/handler_test.go
index 0c6a39ef..54a4ae80 100644
--- a/plugins/http/tests/handler_test.go
+++ b/plugins/http/tests/handler_test.go
@@ -10,7 +10,8 @@ import (
"runtime"
"strings"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/stretchr/testify/assert"
@@ -21,10 +22,10 @@ import (
)
func TestHandler_Echo(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -72,10 +73,10 @@ func Test_HandlerErrors(t *testing.T) {
}
func TestHandler_Headers(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "header", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -133,10 +134,10 @@ func TestHandler_Headers(t *testing.T) {
}
func TestHandler_Empty_User_Agent(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "user-agent", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -193,10 +194,10 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
}
func TestHandler_User_Agent(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "user-agent", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -253,10 +254,10 @@ func TestHandler_User_Agent(t *testing.T) {
}
func TestHandler_Cookies(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "cookie", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -318,10 +319,10 @@ func TestHandler_Cookies(t *testing.T) {
}
func TestHandler_JsonPayload_POST(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -382,10 +383,10 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
}
func TestHandler_JsonPayload_PUT(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -442,10 +443,10 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
}
func TestHandler_JsonPayload_PATCH(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -502,10 +503,10 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
}
func TestHandler_FormData_POST(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -575,10 +576,10 @@ func TestHandler_FormData_POST(t *testing.T) {
}
func TestHandler_FormData_POST_Overwrite(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -648,10 +649,10 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
}
func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -720,10 +721,10 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
}
func TestHandler_FormData_PUT(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -792,10 +793,10 @@ func TestHandler_FormData_PUT(t *testing.T) {
}
func TestHandler_FormData_PATCH(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -864,10 +865,10 @@ func TestHandler_FormData_PATCH(t *testing.T) {
}
func TestHandler_Multipart_POST(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -978,10 +979,10 @@ func TestHandler_Multipart_POST(t *testing.T) {
}
func TestHandler_Multipart_PUT(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1092,10 +1093,10 @@ func TestHandler_Multipart_PUT(t *testing.T) {
}
func TestHandler_Multipart_PATCH(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1208,10 +1209,10 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
}
func TestHandler_Error(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1254,10 +1255,10 @@ func TestHandler_Error(t *testing.T) {
}
func TestHandler_Error2(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error2", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1300,10 +1301,10 @@ func TestHandler_Error2(t *testing.T) {
}
func TestHandler_Error3(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "pid", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1359,10 +1360,10 @@ func TestHandler_Error3(t *testing.T) {
}
func TestHandler_ResponseDuration(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1420,10 +1421,10 @@ func TestHandler_ResponseDuration(t *testing.T) {
}
func TestHandler_ResponseDurationDelayed(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echoDelay", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1480,10 +1481,10 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
}
func TestHandler_ErrorDuration(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1554,10 +1555,10 @@ func TestHandler_IP(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1615,10 +1616,10 @@ func TestHandler_XRealIP(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1681,10 +1682,10 @@ func TestHandler_XForwardedFor(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1746,10 +1747,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1794,10 +1795,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
}
func BenchmarkHandler_Listen_Echo(b *testing.B) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go
index c8dd4b38..1a61597c 100644
--- a/plugins/http/tests/http_test.go
+++ b/plugins/http/tests/http_test.go
@@ -19,6 +19,7 @@ import (
"github.com/spiral/endure"
"github.com/spiral/goridge/v3"
"github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/mocks"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
@@ -821,7 +822,7 @@ func TestHttpMiddleware(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
- tt := time.NewTimer(time.Second * 15)
+ tt := time.NewTimer(time.Second * 20)
go func() {
defer wg.Done()
@@ -900,7 +901,7 @@ func TestHttpEchoErr(t *testing.T) {
mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1")
mockLogger.EXPECT().Debug("WORLD", "pid", gomock.Any())
- mockLogger.EXPECT().Debug("worker event received", "event", roadrunner.EventWorkerLog, "worker state", gomock.Any())
+ mockLogger.EXPECT().Debug("worker event received", "event", events.EventWorkerLog, "worker state", gomock.Any())
err = cont.RegisterAll(
cfg,
diff --git a/plugins/http/tests/response_test.go b/plugins/http/tests/response_test.go
index 2bfe7d56..a526fe03 100644
--- a/plugins/http/tests/response_test.go
+++ b/plugins/http/tests/response_test.go
@@ -6,7 +6,7 @@ import (
"net/http"
"testing"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/internal"
http2 "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/stretchr/testify/assert"
)
@@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error {
}
func TestNewResponse_Error(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{Context: []byte(`invalid payload`)})
+ r, err := http2.NewResponse(internal.Payload{Context: []byte(`invalid payload`)})
assert.Error(t, err)
assert.Nil(t, r)
}
func TestNewResponse_Write(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
Body: []byte(`sample body`),
})
@@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) {
}
func TestNewResponse_Stream(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -92,7 +92,7 @@ func TestNewResponse_Stream(t *testing.T) {
}
func TestNewResponse_StreamError(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -112,7 +112,7 @@ func TestNewResponse_StreamError(t *testing.T) {
}
func TestWrite_HandlesPush(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`),
})
@@ -127,7 +127,7 @@ func TestWrite_HandlesPush(t *testing.T) {
}
func TestWrite_HandlesTrailers(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`),
})
@@ -146,7 +146,7 @@ func TestWrite_HandlesTrailers(t *testing.T) {
}
func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) {
- r, err := http2.NewResponse(roadrunner.Payload{
+ r, err := http2.NewResponse(internal.Payload{
Context: []byte(
`{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`),
})
diff --git a/plugins/http/tests/uploads_test.go b/plugins/http/tests/uploads_test.go
index d36d4793..f255ec91 100644
--- a/plugins/http/tests/uploads_test.go
+++ b/plugins/http/tests/uploads_test.go
@@ -16,7 +16,8 @@ import (
"time"
j "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/stretchr/testify/assert"
)
@@ -26,10 +27,10 @@ var json = j.ConfigCompatibleWithStandardLibrary
const testFile = "uploads_test.go"
func TestHandler_Upload_File(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -109,10 +110,10 @@ func TestHandler_Upload_File(t *testing.T) {
}
func TestHandler_Upload_NestedFile(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -192,10 +193,10 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
}
func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -275,10 +276,10 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
}
func TestHandler_Upload_File_Forbids(t *testing.T) {
- pool, err := roadrunner.NewPool(context.Background(),
+ pool, err := poolImpl.NewPool(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") },
- roadrunner.NewPipeFactory(),
- roadrunner.PoolConfig{
+ pipe.NewPipeFactory(),
+ poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index f3013394..449be085 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -3,9 +3,9 @@ package informer
import (
"github.com/spiral/endure"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/informer"
"github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
)
const PluginName = "informer"
@@ -21,8 +21,8 @@ func (p *Plugin) Init(log log.Logger) error {
return nil
}
-// Workers provides WorkerBase slice with workers for the requested plugin
-func (p *Plugin) Workers(name string) ([]roadrunner.WorkerBase, error) {
+// Workers provides BaseProcess slice with workers for the requested plugin
+func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) {
const op = errors.Op("get workers")
svc, ok := p.registry[name]
if !ok {
diff --git a/plugins/informer/tests/test_plugin.go b/plugins/informer/tests/test_plugin.go
index 473b6de7..3fdefde3 100644
--- a/plugins/informer/tests/test_plugin.go
+++ b/plugins/informer/tests/test_plugin.go
@@ -4,17 +4,18 @@ import (
"context"
"time"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/config"
)
-var testPoolConfig = roadrunner.PoolConfig{
+var testPoolConfig = poolImpl.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- Supervisor: &roadrunner.SupervisorConfig{
+ Supervisor: &poolImpl.SupervisorConfig{
WatchTick: 60,
TTL: 1000,
IdleTTL: 10,
@@ -48,7 +49,7 @@ func (p1 *Plugin1) Name() string {
return "informer.plugin1"
}
-func (p1 *Plugin1) Workers() []roadrunner.WorkerBase {
+func (p1 *Plugin1) Workers() []worker.BaseProcess {
pool, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil)
if err != nil {
panic(err)
diff --git a/plugins/reload/tests/reload_plugin_test.go b/plugins/reload/tests/reload_plugin_test.go
index 8fb9474f..d2fad28d 100644
--- a/plugins/reload/tests/reload_plugin_test.go
+++ b/plugins/reload/tests/reload_plugin_test.go
@@ -386,7 +386,7 @@ func TestReloadCopy3k(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
- tt := time.NewTimer(time.Second * 180)
+ tt := time.NewTimer(time.Second * 220)
go func() {
defer wg.Done()
@@ -429,7 +429,7 @@ func TestReloadCopy3k(t *testing.T) {
t.Run("ReloadMake3kFiles", reloadMake3kFiles)
ttt := time.Now()
t.Run("ReloadCopyFiles", reloadCopyFiles)
- if time.Since(ttt).Seconds() > 100 {
+ if time.Since(ttt).Seconds() > 120 {
t.Fatal("spend too much time on copy")
}
diff --git a/plugins/resetter/tests/test_plugin.go b/plugins/resetter/tests/test_plugin.go
index 9f48a43f..1d770e70 100644
--- a/plugins/resetter/tests/test_plugin.go
+++ b/plugins/resetter/tests/test_plugin.go
@@ -4,17 +4,17 @@ import (
"context"
"time"
- "github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/config"
)
-var testPoolConfig = roadrunner.PoolConfig{
+var testPoolConfig = poolImpl.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- Supervisor: &roadrunner.SupervisorConfig{
+ Supervisor: &poolImpl.SupervisorConfig{
WatchTick: 60,
TTL: 1000,
IdleTTL: 10,
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index ea6d42eb..e6003fbc 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -8,9 +8,14 @@ import (
"strings"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/socket"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
)
@@ -21,7 +26,7 @@ const PluginName = "server"
type Plugin struct {
cfg Config
log log.Logger
- factory roadrunner.Factory
+ factory worker.Factory
}
// Init application provider.
@@ -93,7 +98,7 @@ func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) {
}
// NewWorker issues new standalone worker.
-func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) {
+func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (worker.BaseProcess, error) {
const op = errors.Op("new worker")
spawnCmd, err := server.CmdFactory(env)
if err != nil {
@@ -111,13 +116,13 @@ func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner
}
// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) {
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env server.Env) (pool.Pool, error) {
spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, err
}
- p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt)
+ p, err := poolImpl.NewPool(ctx, spawnCmd, server.factory, opt)
if err != nil {
return nil, err
}
@@ -128,10 +133,10 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConf
}
// creates relay and worker factory.
-func (server *Plugin) initFactory() (roadrunner.Factory, error) {
+func (server *Plugin) initFactory() (worker.Factory, error) {
const op = errors.Op("network factory init")
if server.cfg.Relay == "" || server.cfg.Relay == "pipes" {
- return roadrunner.NewPipeFactory(), nil
+ return pipe.NewPipeFactory(), nil
}
dsn := strings.Split(server.cfg.Relay, "://")
@@ -147,9 +152,9 @@ func (server *Plugin) initFactory() (roadrunner.Factory, error) {
switch dsn[0] {
// sockets group
case "unix":
- return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
+ return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
case "tcp":
- return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
+ return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
default:
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
@@ -165,12 +170,12 @@ func (server *Plugin) setEnv(e server.Env) []string {
}
func (server *Plugin) collectLogs(event interface{}) {
- if we, ok := event.(roadrunner.WorkerEvent); ok {
+ if we, ok := event.(events.WorkerEvent); ok {
switch we.Event {
- case roadrunner.EventWorkerError:
- server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
- case roadrunner.EventWorkerLog:
- server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
+ case events.EventWorkerError:
+ server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid())
+ case events.EventWorkerLog:
+ server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid())
}
}
}
diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go
index fbd37e12..61c9a8f9 100644
--- a/plugins/server/tests/plugin_pipes.go
+++ b/plugins/server/tests/plugin_pipes.go
@@ -5,8 +5,11 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/internal"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -14,12 +17,12 @@ import (
const ConfigSection = "server"
const Response = "test"
-var testPoolConfig = roadrunner.PoolConfig{
+var testPoolConfig = poolImpl.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- Supervisor: &roadrunner.SupervisorConfig{
+ Supervisor: &poolImpl.SupervisorConfig{
WatchTick: 60,
TTL: 1000,
IdleTTL: 10,
@@ -31,7 +34,7 @@ var testPoolConfig = roadrunner.PoolConfig{
type Foo struct {
configProvider config.Configurer
wf server.Server
- pool roadrunner.Pool
+ pool pool.Pool
}
func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error {
@@ -44,7 +47,7 @@ func (f *Foo) Serve() chan error {
const op = errors.Op("serve")
// test payload for echo
- r := roadrunner.Payload{
+ r := internal.Payload{
Context: nil,
Body: []byte(Response),
}
@@ -78,7 +81,7 @@ func (f *Foo) Serve() chan error {
}
// test that our worker is functional
- sw, err := roadrunner.NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go
index 4942d4c5..3b97efff 100644
--- a/plugins/server/tests/plugin_sockets.go
+++ b/plugins/server/tests/plugin_sockets.go
@@ -4,8 +4,10 @@ import (
"context"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -13,7 +15,7 @@ import (
type Foo2 struct {
configProvider config.Configurer
wf server.Server
- pool roadrunner.Pool
+ pool pool.Pool
}
func (f *Foo2) Init(p config.Configurer, workerFactory server.Server) error {
@@ -29,7 +31,7 @@ func (f *Foo2) Serve() chan error {
conf := &plugin.Config{}
// test payload for echo
- r := roadrunner.Payload{
+ r := internal.Payload{
Context: nil,
Body: []byte(Response),
}
@@ -59,7 +61,7 @@ func (f *Foo2) Serve() chan error {
}
// test that our worker is functional
- sw, err := roadrunner.NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go
index 89757a02..2857dadc 100644
--- a/plugins/server/tests/plugin_tcp.go
+++ b/plugins/server/tests/plugin_tcp.go
@@ -4,8 +4,10 @@ import (
"context"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -13,7 +15,7 @@ import (
type Foo3 struct {
configProvider config.Configurer
wf server.Server
- pool roadrunner.Pool
+ pool pool.Pool
}
func (f *Foo3) Init(p config.Configurer, workerFactory server.Server) error {
@@ -29,7 +31,7 @@ func (f *Foo3) Serve() chan error {
conf := &plugin.Config{}
// test payload for echo
- r := roadrunner.Payload{
+ r := internal.Payload{
Context: nil,
Body: []byte(Response),
}
@@ -59,7 +61,7 @@ func (f *Foo3) Serve() chan error {
}
// test that our worker is functional
- sw, err := roadrunner.NewSyncWorker(w)
+ sw, err := worker.From(w)
if err != nil {
errCh <- err
return errCh
diff --git a/process_state.go b/process.go
index 1291a904..3efcdbc5 100755..100644
--- a/process_state.go
+++ b/process.go
@@ -3,6 +3,7 @@ package roadrunner
import (
"github.com/shirou/gopsutil/process"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
)
// ProcessState provides information about specific worker.
@@ -25,7 +26,7 @@ type ProcessState struct {
}
// WorkerProcessState creates new worker state definition.
-func WorkerProcessState(w WorkerBase) (ProcessState, error) {
+func WorkerProcessState(w worker.BaseProcess) (ProcessState, error) {
const op = errors.Op("worker_process state")
p, _ := process.NewProcess(int32(w.Pid()))
i, err := p.MemoryInfo()
@@ -41,18 +42,3 @@ func WorkerProcessState(w WorkerBase) (ProcessState, error) {
MemoryUsage: i.RSS,
}, nil
}
-
-// ServerState returns list of all worker states of a given rr server.
-func PoolState(pool Pool) ([]ProcessState, error) {
- result := make([]ProcessState, 0)
- for _, w := range pool.Workers() {
- state, err := WorkerProcessState(w)
- if err != nil {
- return nil, err
- }
-
- result = append(result, state)
- }
-
- return result, nil
-}
diff --git a/state_test.go b/state_test.go
deleted file mode 100755
index 10547a4b..00000000
--- a/state_test.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package roadrunner
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_NewState(t *testing.T) {
- st := newState(StateErrored)
-
- assert.Equal(t, "errored", st.String())
-
- assert.Equal(t, "inactive", newState(StateInactive).String())
- assert.Equal(t, "ready", newState(StateReady).String())
- assert.Equal(t, "working", newState(StateWorking).String())
- assert.Equal(t, "stopped", newState(StateStopped).String())
- assert.Equal(t, "undefined", newState(1000).String())
-}
-
-func Test_IsActive(t *testing.T) {
- assert.False(t, newState(StateInactive).IsActive())
- assert.True(t, newState(StateReady).IsActive())
- assert.True(t, newState(StateWorking).IsActive())
- assert.False(t, newState(StateStopped).IsActive())
- assert.False(t, newState(StateErrored).IsActive())
-}
diff --git a/sync_worker_test.go b/sync_worker_test.go
deleted file mode 100755
index 0ef1e0cd..00000000
--- a/sync_worker_test.go
+++ /dev/null
@@ -1,263 +0,0 @@
-package roadrunner
-
-import (
- "context"
- "os/exec"
- "strings"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/errors"
- "github.com/stretchr/testify/assert"
-)
-
-func Test_Echo(t *testing.T) {
- ctx := context.Background()
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
-
- syncWorker, err := NewSyncWorker(w)
- if err != nil {
- t.Fatal(err)
- }
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop(ctx)
- if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
- }
- }()
-
- res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
-
- assert.Nil(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_BadPayload(t *testing.T) {
- ctx := context.Background()
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
-
- syncWorker, err := NewSyncWorker(w)
- if err != nil {
- t.Fatal(err)
- }
-
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop(ctx)
- if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
- }
- }()
-
- res, err := syncWorker.Exec(EmptyPayload)
-
- assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
-
- assert.Contains(t, err.Error(), "payload can not be empty")
-}
-
-func Test_NotStarted_String(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := InitBaseWorker(cmd)
- assert.Contains(t, w.String(), "php tests/client.php echo pipes")
- assert.Contains(t, w.String(), "inactive")
- assert.Contains(t, w.String(), "numExecs: 0")
-}
-
-func Test_NotStarted_Exec(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := InitBaseWorker(cmd)
-
- syncWorker, err := NewSyncWorker(w)
- if err != nil {
- t.Fatal(err)
- }
-
- res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
-
- assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
-
- assert.Contains(t, err.Error(), "WorkerProcess is not ready (inactive)")
-}
-
-func Test_String(t *testing.T) {
- ctx := context.Background()
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop(ctx)
- if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
- }
- }()
-
- assert.Contains(t, w.String(), "php tests/client.php echo pipes")
- assert.Contains(t, w.String(), "ready")
- assert.Contains(t, w.String(), "numExecs: 0")
-}
-
-func Test_Echo_Slow(t *testing.T) {
- ctx := context.Background()
- cmd := exec.Command("php", "tests/slow-client.php", "echo", "pipes", "10", "10")
-
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop(ctx)
- if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
- }
- }()
-
- syncWorker, err := NewSyncWorker(w)
- if err != nil {
- t.Fatal(err)
- }
-
- res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
-
- assert.Nil(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_Broken(t *testing.T) {
- ctx := context.Background()
- cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
-
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
-
- data := ""
- mu := &sync.Mutex{}
- w.AddListener(func(event interface{}) {
- if wev, ok := event.(WorkerEvent); ok {
- mu.Lock()
- data = string(wev.Payload.([]byte))
- mu.Unlock()
- }
- })
-
- syncWorker, err := NewSyncWorker(w)
- if err != nil {
- t.Fatal(err)
- }
-
- res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
- assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
-
- time.Sleep(time.Second * 3)
- mu.Lock()
- if strings.ContainsAny(data, "undefined_function()") == false {
- t.Fail()
- }
- mu.Unlock()
- assert.Error(t, w.Stop(ctx))
-}
-
-func Test_Error(t *testing.T) {
- ctx := context.Background()
- cmd := exec.Command("php", "tests/client.php", "error", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- defer func() {
- err := w.Stop(ctx)
- if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
- }
- }()
-
- syncWorker, err := NewSyncWorker(w)
- if err != nil {
- t.Fatal(err)
- }
-
- res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
- assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
-
- if errors.Is(errors.ErrSoftJob, err) == false {
- t.Fatal("error should be of type errors.ErrSoftJob")
- }
- assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello")
-}
-
-func Test_NumExecs(t *testing.T) {
- ctx := context.Background()
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop(ctx)
- if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
- }
- }()
-
- syncWorker, err := NewSyncWorker(w)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = syncWorker.Exec(Payload{Body: []byte("hello")})
- if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
- }
- assert.Equal(t, int64(1), w.State().NumExecs())
-
- _, err = syncWorker.Exec(Payload{Body: []byte("hello")})
- if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
- }
- assert.Equal(t, int64(2), w.State().NumExecs())
-
- _, err = syncWorker.Exec(Payload{Body: []byte("hello")})
- if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
- }
- assert.Equal(t, int64(3), w.State().NumExecs())
-}
diff --git a/util/events.go b/util/events.go
deleted file mode 100755
index 43a3e646..00000000
--- a/util/events.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package util
-
-type EventsHandler interface {
- NumListeners() int
- AddListener(listener EventListener)
- Push(e interface{})
-}
-
-// Event listener listens for the events produced by worker, worker pool or other service.
-type EventListener func(event interface{})
-
-// EventHandler helps to broadcast events to multiple listeners.
-type EventHandler struct {
- listeners []EventListener
-}
-
-func NewEventsHandler() EventsHandler {
- return &EventHandler{listeners: make([]EventListener, 0, 2)}
-}
-
-// NumListeners returns number of event listeners.
-func (eb *EventHandler) NumListeners() int {
- return len(eb.listeners)
-}
-
-// AddListener registers new event listener.
-func (eb *EventHandler) AddListener(listener EventListener) {
- eb.listeners = append(eb.listeners, listener)
-}
-
-// Push broadcast events across all event listeners.
-func (eb *EventHandler) Push(e interface{}) {
- for _, listener := range eb.listeners {
- listener(e)
- }
-}
diff --git a/worker.go b/worker.go
deleted file mode 100755
index 07c1e5c8..00000000
--- a/worker.go
+++ /dev/null
@@ -1,370 +0,0 @@
-package roadrunner
-
-import (
- "bytes"
- "context"
- "fmt"
- "io"
- "os"
- "os/exec"
- "strconv"
- "strings"
- "sync"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/util"
-
- "github.com/spiral/goridge/v3"
- "go.uber.org/multierr"
-)
-
-const (
- // WaitDuration - for how long error buffer should attempt to aggregate error messages
- // before merging output together since lastError update (required to keep error update together).
- WaitDuration = 25 * time.Millisecond
-
- // ReadBufSize used to make a slice with specified length to read from stderr
- ReadBufSize = 10240 // Kb
-)
-
-// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
-const (
- // EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError Event = iota + 200
-
- // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
- EventWorkerLog
-)
-
-type Event int64
-
-func (ev Event) String() string {
- switch ev {
- case EventWorkerError:
- return "EventWorkerError"
- case EventWorkerLog:
- return "EventWorkerLog"
- }
- return "Unknown event type"
-}
-
-// WorkerEvent wraps worker events.
-type WorkerEvent struct {
- // Event id, see below.
- Event Event
-
- // Worker triggered the event.
- Worker WorkerBase
-
- // Event specific payload.
- Payload interface{}
-}
-
-var pool = sync.Pool{
- New: func() interface{} {
- buf := make([]byte, ReadBufSize)
- return &buf
- },
-}
-
-type WorkerBase interface {
- fmt.Stringer
-
- // Pid returns worker pid.
- Pid() int64
-
- // Created returns time worker was created at.
- Created() time.Time
-
- // AddListener attaches listener to consume worker events.
- AddListener(listener util.EventListener)
-
- // State return receive-only WorkerProcess state object, state can be used to safely access
- // WorkerProcess status, time when status changed and number of WorkerProcess executions.
- State() State
-
- // Start used to run Cmd and immediately return
- Start() error
-
- // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
- // complete and will return process error (if any), if stderr is presented it's value
- // will be wrapped as WorkerError. Method will return error code if php process fails
- // to find or Start the script.
- Wait() error
-
- // Stop sends soft termination command to the WorkerProcess and waits for process completion.
- Stop(ctx context.Context) error
-
- // Kill kills underlying process, make sure to call Wait() func to gather
- // error log from the stderr. Does not waits for process completion!
- Kill() error
-
- // Relay returns attached to worker goridge relay
- Relay() goridge.Relay
-
- // AttachRelay used to attach goridge relay to the worker process
- AttachRelay(rl goridge.Relay)
-}
-
-// WorkerProcess - supervised process with api over goridge.Relay.
-type WorkerProcess struct {
- // created indicates at what time WorkerProcess has been created.
- created time.Time
-
- // updates parent supervisor or pool about WorkerProcess events
- events util.EventsHandler
-
- // state holds information about current WorkerProcess state,
- // number of WorkerProcess executions, buf status change time.
- // publicly this object is receive-only and protected using Mutex
- // and atomic counter.
- state *state
-
- // underlying command with associated process, command must be
- // provided to WorkerProcess from outside in non-started form. CmdSource
- // stdErr direction will be handled by WorkerProcess to aggregate error message.
- cmd *exec.Cmd
-
- // pid of the process, points to pid of underlying process and
- // can be nil while process is not started.
- pid int
-
- // stderr aggregates stderr output from underlying process. Value can be
- // receive only once command is completed and all pipes are closed.
- stderr *bytes.Buffer
-
- // channel is being closed once command is complete.
- // waitDone chan interface{}
-
- // contains information about resulted process state.
- endState *os.ProcessState
-
- // ensures than only one execution can be run at once.
- mu sync.RWMutex
-
- // communication bus with underlying process.
- relay goridge.Relay
- rd io.Reader
- stop chan struct{}
-}
-
-// InitBaseWorker creates new WorkerProcess over given exec.cmd.
-func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
- if cmd.Process != nil {
- return nil, fmt.Errorf("can't attach to running process")
- }
- w := &WorkerProcess{
- created: time.Now(),
- events: &util.EventHandler{},
- cmd: cmd,
- state: newState(StateInactive),
- stderr: new(bytes.Buffer),
- stop: make(chan struct{}, 1),
- }
-
- w.rd, w.cmd.Stderr = io.Pipe()
-
- // small buffer optimization
- // at this point we know, that stderr will contain huge messages
- w.stderr.Grow(ReadBufSize)
-
- go func() {
- w.watch()
- }()
-
- return w, nil
-}
-
-// Pid returns worker pid.
-func (w *WorkerProcess) Pid() int64 {
- return int64(w.pid)
-}
-
-// Created returns time worker was created at.
-func (w *WorkerProcess) Created() time.Time {
- return w.created
-}
-
-// AddListener registers new worker event listener.
-func (w *WorkerProcess) AddListener(listener util.EventListener) {
- w.events.AddListener(listener)
-}
-
-// State return receive-only WorkerProcess state object, state can be used to safely access
-// WorkerProcess status, time when status changed and number of WorkerProcess executions.
-func (w *WorkerProcess) State() State {
- return w.state
-}
-
-// State return receive-only WorkerProcess state object, state can be used to safely access
-// WorkerProcess status, time when status changed and number of WorkerProcess executions.
-func (w *WorkerProcess) AttachRelay(rl goridge.Relay) {
- w.relay = rl
-}
-
-// State return receive-only WorkerProcess state object, state can be used to safely access
-// WorkerProcess status, time when status changed and number of WorkerProcess executions.
-func (w *WorkerProcess) Relay() goridge.Relay {
- return w.relay
-}
-
-// String returns WorkerProcess description. fmt.Stringer interface
-func (w *WorkerProcess) String() string {
- st := w.state.String()
- // we can safely compare pid to 0
- if w.pid != 0 {
- st = st + ", pid:" + strconv.Itoa(w.pid)
- }
-
- return fmt.Sprintf(
- "(`%s` [%s], numExecs: %v)",
- strings.Join(w.cmd.Args, " "),
- st,
- w.state.NumExecs(),
- )
-}
-
-func (w *WorkerProcess) Start() error {
- err := w.cmd.Start()
- if err != nil {
- return err
- }
- w.pid = w.cmd.Process.Pid
- return nil
-}
-
-// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
-// complete and will return process error (if any), if stderr is presented it's value
-// will be wrapped as WorkerError. Method will return error code if php process fails
-// to find or Start the script.
-func (w *WorkerProcess) Wait() error {
- const op = errors.Op("worker process wait")
- err := multierr.Combine(w.cmd.Wait())
-
- // at this point according to the documentation (see cmd.Wait comment)
- // if worker finishes with an error, message will be written to the stderr first
- // and then w.cmd.Wait return an error
- w.endState = w.cmd.ProcessState
- if err != nil {
- w.state.Set(StateErrored)
-
- w.mu.RLock()
- // if process return code > 0, here will be an error from stderr (if presents)
- if w.stderr.Len() > 0 {
- err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
- // stop the stderr buffer
- w.stop <- struct{}{}
- }
- w.mu.RUnlock()
-
- return multierr.Append(err, w.closeRelay())
- }
-
- err = multierr.Append(err, w.closeRelay())
- if err != nil {
- w.state.Set(StateErrored)
- return err
- }
-
- if w.endState.Success() {
- w.state.Set(StateStopped)
- }
-
- return nil
-}
-
-func (w *WorkerProcess) closeRelay() error {
- if w.relay != nil {
- err := w.relay.Close()
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-// Stop sends soft termination command to the WorkerProcess and waits for process completion.
-func (w *WorkerProcess) Stop(ctx context.Context) error {
- c := make(chan error)
-
- go func() {
- var err error
- w.state.Set(StateStopping)
- err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true}))
- if err != nil {
- w.state.Set(StateKilling)
- c <- multierr.Append(err, w.cmd.Process.Kill())
- }
- w.state.Set(StateStopped)
- c <- nil
- }()
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-c:
- if err != nil {
- return err
- }
- return nil
- }
-}
-
-// Kill kills underlying process, make sure to call Wait() func to gather
-// error log from the stderr. Does not waits for process completion!
-func (w *WorkerProcess) Kill() error {
- w.state.Set(StateKilling)
- err := w.cmd.Process.Signal(os.Kill)
- if err != nil {
- return err
- }
- w.state.Set(StateStopped)
- return nil
-}
-
-// put the pointer, to not allocate new slice
-// but erase it len and then return back
-func (w *WorkerProcess) put(data *[]byte) {
- *data = (*data)[:0]
- *data = (*data)[:cap(*data)]
-
- pool.Put(data)
-}
-
-// get pointer to the byte slice
-func (w *WorkerProcess) get() *[]byte {
- return pool.Get().(*[]byte)
-}
-
-// Write appends the contents of pool to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of pool; errBuffer is always nil.
-func (w *WorkerProcess) watch() {
- go func() {
- for {
- select {
- case <-w.stop:
- buf := w.get()
- // read the last data
- n, _ := w.rd.Read(*buf)
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
- w.mu.Lock()
- // write new message
- w.stderr.Write((*buf)[:n])
- w.mu.Unlock()
- w.put(buf)
- return
- default:
- // read the max 10kb of stderr per one read
- buf := w.get()
- n, _ := w.rd.Read(*buf)
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
- w.mu.Lock()
- // write new message
- w.stderr.Write((*buf)[:n])
- w.mu.Unlock()
- w.put(buf)
- }
- }
- }()
-}
diff --git a/worker_test.go b/worker_test.go
deleted file mode 100755
index e82d383e..00000000
--- a/worker_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package roadrunner
-
-import (
- "context"
- "os/exec"
- "sync"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_GetState(t *testing.T) {
- ctx := context.Background()
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- assert.Equal(t, StateStopped, w.State().Value())
- }()
-
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- assert.Equal(t, StateReady, w.State().Value())
- err = w.Stop(ctx)
- if err != nil {
- t.Errorf("error stopping the WorkerProcess: error %v", err)
- }
-}
-
-func Test_Kill(t *testing.T) {
- ctx := context.Background()
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
- wg := &sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer wg.Done()
- assert.Error(t, w.Wait())
- // TODO changed from stopped, discuss
- assert.Equal(t, StateErrored, w.State().Value())
- }()
-
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- assert.Equal(t, StateReady, w.State().Value())
- err = w.Kill()
- if err != nil {
- t.Errorf("error killing the WorkerProcess: error %v", err)
- }
- wg.Wait()
-}
-
-func Test_OnStarted(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
- assert.Nil(t, cmd.Start())
-
- w, err := InitBaseWorker(cmd)
- assert.Nil(t, w)
- assert.NotNil(t, err)
-
- assert.Equal(t, "can't attach to running process", err.Error())
-}