summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
committerValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
commit7fb3cc3588cfde9260a6bb431330ce1e0a71f56d (patch)
tree3200cf2136f7413a7e1cfc6ecdaa83716f9655f9
parentee5d34abde7f3931bf939498eb7a8cb170232f4f (diff)
interfaces folder deprecated
-rwxr-xr-xMakefile12
-rwxr-xr-xgo.sum1
-rw-r--r--interfaces/pool/pool.go100
-rw-r--r--pkg/doc/pool_workflow.drawio1
-rw-r--r--pkg/doc/workflow.drawio1
-rwxr-xr-xpkg/events/general.go (renamed from pkg/events/events.go)10
-rw-r--r--pkg/events/interface.go (renamed from interfaces/events/handler.go)0
-rw-r--r--pkg/events/pool_events.go (renamed from interfaces/events/pool_events.go)0
-rw-r--r--pkg/events/worker_events.go (renamed from interfaces/events/worker_events.go)0
-rw-r--r--pkg/pool/config.go10
-rw-r--r--pkg/pool/interface.go29
-rwxr-xr-xpkg/pool/static_pool.go73
-rwxr-xr-xpkg/pool/static_pool_test.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go15
-rw-r--r--pkg/pool/supervisor_test.go2
-rw-r--r--pkg/transport/interface.go (renamed from interfaces/worker/factory.go)9
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go (renamed from pkg/pipe/pipe_factory.go)17
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go (renamed from pkg/pipe/pipe_factory_spawn_test.go)134
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go (renamed from pkg/pipe/pipe_factory_test.go)128
-rwxr-xr-xpkg/transport/socket/socket_factory.go (renamed from pkg/socket/socket_factory.go)17
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go (renamed from pkg/socket/socket_factory_spawn_test.go)64
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go (renamed from pkg/socket/socket_factory_test.go)66
-rw-r--r--pkg/worker/interface.go (renamed from interfaces/worker/worker.go)3
-rwxr-xr-xpkg/worker/sync_worker.go111
-rwxr-xr-xpkg/worker/sync_worker_test.go7
-rwxr-xr-xpkg/worker/worker.go10
-rw-r--r--pkg/worker_watcher/interface.go (renamed from interfaces/worker/watcher.go)18
-rw-r--r--pkg/worker_watcher/stack.go18
-rw-r--r--pkg/worker_watcher/stack_test.go53
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go22
-rw-r--r--plugins/http/handler.go4
-rw-r--r--plugins/http/plugin.go16
-rw-r--r--plugins/informer/interface.go2
-rw-r--r--plugins/informer/plugin.go2
-rw-r--r--plugins/informer/rpc.go2
-rw-r--r--plugins/server/interface.go8
-rw-r--r--plugins/server/plugin.go17
-rw-r--r--tests/plugins/http/handler_test.go218
-rw-r--r--tests/plugins/http/http_plugin_test.go2
-rw-r--r--tests/plugins/http/uploads_test.go2
-rw-r--r--tests/plugins/informer/test_plugin.go18
-rw-r--r--tests/plugins/server/plugin_pipes.go8
-rw-r--r--tests/plugins/server/plugin_sockets.go8
-rw-r--r--tests/plugins/server/plugin_tcp.go8
-rw-r--r--tools/process.go2
45 files changed, 548 insertions, 704 deletions
diff --git a/Makefile b/Makefile
index b2321d59..d61d2bc6 100755
--- a/Makefile
+++ b/Makefile
@@ -27,9 +27,9 @@ test_coverage:
docker-compose -f tests/docker-compose.yaml up -d
rm -rf coverage
mkdir coverage
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipe.out -covermode=atomic ./pkg/pipe
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipe.out -covermode=atomic ./pkg/transport/pipe
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/socket.out -covermode=atomic ./pkg/transport/socket
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pool.out -covermode=atomic ./pkg/pool
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/socket.out -covermode=atomic ./pkg/socket
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http
@@ -58,9 +58,9 @@ test_coverage:
test: ## Run application tests
docker-compose -f tests/docker-compose.yaml up -d
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pipe
+ go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/pipe
+ go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/socket
go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pool
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/socket
go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/http
@@ -88,9 +88,9 @@ test: ## Run application tests
test_1.14: ## Run application tests
docker-compose -f tests/docker-compose.yaml up -d
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pipe
+ go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/pipe
+ go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/socket
go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pool
- go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/socket
go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker
go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker_watcher
go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/http
diff --git a/go.sum b/go.sum
index fa1c2cdf..368e7fd0 100755
--- a/go.sum
+++ b/go.sum
@@ -311,6 +311,7 @@ github.com/spiral/errors v1.0.9 h1:RcVZ7a1RYkaT3HWFGDuQiDB02pG6yqh7715Uwd7urwM=
github.com/spiral/errors v1.0.9/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/goridge/v3 v3.0.0 h1:FIz6wHaob5KynpOfzVpzj4bmqbEelGPFyuEf4i2+CG8=
github.com/spiral/goridge/v3 v3.0.0/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE=
+github.com/spiral/roadrunner v1.9.2 h1:jGtXs3r5fevdbrkDF8BdFxEY4rIZwplnns1oWj7Vyw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go
deleted file mode 100644
index 97cc945c..00000000
--- a/interfaces/pool/pool.go
+++ /dev/null
@@ -1,100 +0,0 @@
-package pool
-
-import (
- "context"
- "runtime"
- "time"
-
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- "github.com/spiral/roadrunner/v2/pkg/payload"
-)
-
-// Pool managed set of inner worker processes.
-type Pool interface {
- // GetConfig returns pool configuration.
- GetConfig() interface{}
-
- // Exec executes task with payload
- Exec(rqs payload.Payload) (payload.Payload, error)
-
- // ExecWithContext executes task with context which is used with timeout
- ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
-
- // Workers returns worker list associated with the pool.
- Workers() (workers []worker.BaseProcess)
-
- // Remove worker from the pool.
- RemoveWorker(worker worker.BaseProcess) error
-
- // Destroy all underlying stack (but let them to complete the task).
- Destroy(ctx context.Context)
-}
-
-// Configures the pool behaviour.
-type Config struct {
- // Debug flag creates new fresh worker before every request.
- Debug bool
-
- // NumWorkers defines how many sub-processes can be run at once. This value
- // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
- NumWorkers int64
-
- // MaxJobs defines how many executions is allowed for the worker until
- // it's destruction. set 1 to create new process for each new task, 0 to let
- // worker handle as many tasks as it can.
- MaxJobs int64
-
- // AllocateTimeout defines for how long pool will be waiting for a worker to
- // be freed to handle the task. Defaults to 60s.
- AllocateTimeout time.Duration
-
- // DestroyTimeout defines for how long pool should be waiting for worker to
- // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
- DestroyTimeout time.Duration
-
- // Supervision config to limit worker and pool memory usage.
- Supervisor *SupervisorConfig
-}
-
-// InitDefaults enables default config values.
-func (cfg *Config) InitDefaults() {
- if cfg.NumWorkers == 0 {
- cfg.NumWorkers = int64(runtime.NumCPU())
- }
-
- if cfg.AllocateTimeout == 0 {
- cfg.AllocateTimeout = time.Minute
- }
-
- if cfg.DestroyTimeout == 0 {
- cfg.DestroyTimeout = time.Minute
- }
- if cfg.Supervisor == nil {
- return
- }
- cfg.Supervisor.InitDefaults()
-}
-
-type SupervisorConfig struct {
- // WatchTick defines how often to check the state of worker.
- WatchTick uint64
-
- // TTL defines maximum time worker is allowed to live.
- TTL uint64
-
- // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL uint64
-
- // ExecTTL defines maximum lifetime per job.
- ExecTTL uint64
-
- // MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64
-}
-
-// InitDefaults enables default config values.
-func (cfg *SupervisorConfig) InitDefaults() {
- if cfg.WatchTick == 0 {
- cfg.WatchTick = 1
- }
-}
diff --git a/pkg/doc/pool_workflow.drawio b/pkg/doc/pool_workflow.drawio
new file mode 100644
index 00000000..fd78d5a5
--- /dev/null
+++ b/pkg/doc/pool_workflow.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-01-23T19:14:50.556Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="3_IXXh0-4hROHlpEF-EL" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">jZJNT4QwEIZ/TY8mQCWy10XUxJiNclg9NnSWNimUdIuAv95iW6Ahm8iBzDzz0enbQThvxmdFOvYmKQiURHRE+BElySHOzH8GkwUZxhbUilOL4hWU/AccjBztOYVrkKilFJp3Iaxk20KlA0aUkkOYdpEiPLUjNexAWRGxp2dONfPTmW+NvACvmd6FGuLzHbgyQuWwQbhAOFdSams1Yw5ils9LY+uebkSX2RS0+j8FnylJH75O9cdrkZ3O6XF8Z5c71+WbiN7d2Q2rJy+Ckn1LYW4SIXwcGNdQdqSao4N5dcOYboTxYmO6dqA0jDfnjJfbm8UB2YBWk0nxBV4wtzQH5w7BC1jGNtrfO0bco9dL51UUYzhdvLvq/xfb7DEufgE=</diagram></mxfile> \ No newline at end of file
diff --git a/pkg/doc/workflow.drawio b/pkg/doc/workflow.drawio
new file mode 100644
index 00000000..d32d7b2d
--- /dev/null
+++ b/pkg/doc/workflow.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-01-23T19:13:52.763Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="OK65Xj3LZBjWN0qK7Y8T" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">jZJNT4QwEIZ/TY8mQCWy10XUxJiNclg9NnSWNimUdIuAv95iW6Ahm8iBzDzz0enbQThvxmdFOvYmKQiURHRE+BElySHOzH8GkwUZxhbUilOL4hWU/AccjBztOYVrkKilFJp3Iaxk20KlA0aUkkOYdpEiPLUjNexAWRGxp2dONfPTmW+NvACvmd6FGuLzHbgyQuWwQbhAOFdSams1Yw5ils9LY+uebkSX2RS0+j8FnylJH75O9cdrkZ3O6XF8Z5c71+WbiN7d2Q2rJy+Ckn1LYW4SIXwcGNdQdqSao4N5dcOYboTxYmO6dqA0jDfnjJfbm8UB2YBWk0nxBV4wtzQH5w7BC1jGNtrfO0bco9dL51UUYzhdvLvq/xfb7DEufgE=</diagram></mxfile> \ No newline at end of file
diff --git a/pkg/events/events.go b/pkg/events/general.go
index 226a0c91..a09a8759 100755
--- a/pkg/events/events.go
+++ b/pkg/events/general.go
@@ -2,18 +2,16 @@ package events
import (
"sync"
-
- "github.com/spiral/roadrunner/v2/interfaces/events"
)
// HandlerImpl helps to broadcast events to multiple listeners.
type HandlerImpl struct {
- listeners []events.Listener
+ listeners []Listener
sync.RWMutex // all receivers should be pointers
}
-func NewEventsHandler() events.Handler {
- return &HandlerImpl{listeners: make([]events.Listener, 0, 2)}
+func NewEventsHandler() Handler {
+ return &HandlerImpl{listeners: make([]Listener, 0, 2)}
}
// NumListeners returns number of event listeners.
@@ -24,7 +22,7 @@ func (eb *HandlerImpl) NumListeners() int {
}
// AddListener registers new event listener.
-func (eb *HandlerImpl) AddListener(listener events.Listener) {
+func (eb *HandlerImpl) AddListener(listener Listener) {
eb.Lock()
defer eb.Unlock()
eb.listeners = append(eb.listeners, listener)
diff --git a/interfaces/events/handler.go b/pkg/events/interface.go
index ac6c15a4..ac6c15a4 100644
--- a/interfaces/events/handler.go
+++ b/pkg/events/interface.go
diff --git a/interfaces/events/pool_events.go b/pkg/events/pool_events.go
index 2cc76eee..2cc76eee 100644
--- a/interfaces/events/pool_events.go
+++ b/pkg/events/pool_events.go
diff --git a/interfaces/events/worker_events.go b/pkg/events/worker_events.go
index 2bff1811..2bff1811 100644
--- a/interfaces/events/worker_events.go
+++ b/pkg/events/worker_events.go
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index e3e2d3cd..cf4aaaee 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -52,19 +52,19 @@ func (cfg *Config) InitDefaults() {
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
- WatchTick uint64 `mapstructure:"watch_tick"`
+ WatchTick uint64
// TTL defines maximum time worker is allowed to live.
- TTL uint64 `mapstructure:"ttl"`
+ TTL uint64
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL uint64 `mapstructure:"idle_ttl"`
+ IdleTTL uint64
// ExecTTL defines maximum lifetime per job.
- ExecTTL uint64 `mapstructure:"exec_ttl"`
+ ExecTTL uint64
// MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
+ MaxWorkerMemory uint64
}
// InitDefaults enables default config values.
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
new file mode 100644
index 00000000..f3fe4065
--- /dev/null
+++ b/pkg/pool/interface.go
@@ -0,0 +1,29 @@
+package pool
+
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+// Pool managed set of inner worker processes.
+type Pool interface {
+ // GetConfig returns pool configuration.
+ GetConfig() interface{}
+
+ // Exec executes task with payload
+ Exec(rqs payload.Payload) (payload.Payload, error)
+
+ // ExecWithContext executes task with context which is used with timeout
+ ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
+
+ // Workers returns worker list associated with the pool.
+ Workers() (workers []*worker.SyncWorkerImpl)
+
+ // Remove worker from the pool.
+ RemoveWorker(worker worker.SyncWorker) error
+
+ // Destroy all underlying stack (but let them to complete the task).
+ Destroy(ctx context.Context)
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index d1b726c1..bb416b29 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -6,13 +6,11 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/transport"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
)
@@ -20,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error)
type Options func(p *StaticPool)
@@ -34,7 +32,7 @@ type StaticPool struct {
cmd Command
// creates and connects to stack
- factory worker.Factory
+ factory transport.Factory
// distributes the events
events events.Handler
@@ -43,7 +41,7 @@ type StaticPool struct {
listeners []events.Listener
// manages worker states and TTLs
- ww worker.Watcher
+ ww workerWatcher.Watcher
// allocate new worker
allocator worker.Allocator
@@ -53,7 +51,7 @@ type StaticPool struct {
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) {
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
const op = errors.Op("static_pool_initialize")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
@@ -69,7 +67,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
cfg: cfg,
cmd: cmd,
factory: factory,
- events: eventsPkg.NewEventsHandler(),
+ events: events.NewEventsHandler(),
}
// add pool options
@@ -78,7 +76,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
}
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
- p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
@@ -124,11 +122,11 @@ func (sp *StaticPool) GetConfig() interface{} {
}
// Workers returns worker list associated with the pool.
-func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
+func (sp *StaticPool) Workers() (workers []*worker.SyncWorkerImpl) {
return sp.ww.WorkersList()
}
-func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
+func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
return sp.ww.RemoveWorker(wb)
}
@@ -153,12 +151,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// worker want's to be terminated
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
- sp.stopWorker(&w)
+ sp.stopWorker(w)
return sp.Exec(p)
}
- err = sp.checkMaxJobs(&w)
+ err = sp.checkMaxJobs(w)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -183,11 +181,11 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
// worker want's to be terminated
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
- sp.stopWorker(&w)
+ sp.stopWorker(w)
return sp.ExecWithContext(ctx, p)
}
- err = sp.checkMaxJobs(&w)
+ err = sp.checkMaxJobs(w)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -195,30 +193,30 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
return rsp, nil
}
-func (sp *StaticPool) stopWorker(w *worker.SyncWorker) {
+func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
const op = errors.Op("static_pool_stop_worker")
- (*w).State().Set(internal.StateInvalid)
- err := (*w).Stop()
+ w.State().Set(internal.StateInvalid)
+ err := w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: *w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-func (sp *StaticPool) checkMaxJobs(w *worker.SyncWorker) error {
+func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
const op = errors.Op("static_pool_check_max_jobs")
- if sp.cfg.MaxJobs != 0 && (*w).State().NumExecs() >= sp.cfg.MaxJobs {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err := sp.ww.AllocateNew()
if err != nil {
return errors.E(op, err)
}
} else {
- sp.ww.PushWorker(*w)
+ sp.ww.PushWorker(w)
}
return nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (*worker.SyncWorkerImpl, error) {
// GetFreeWorker function consumes context with timeout
w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
@@ -230,7 +228,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
// else if err not nil - return error
return nil, errors.E(op, err)
}
- return w.(worker.SyncWorker), nil
+ return w, nil
}
// Destroy all underlying stack (but let them to complete the task).
@@ -239,7 +237,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (payload.Payload, error) {
+ return func(err error, w worker.SyncWorker) (payload.Payload, error) {
const op = errors.Op("error encoder")
// just push event if on any stage was timeout error
if errors.Is(errors.ExecTTL, err) {
@@ -277,8 +275,8 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
}
-func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
- return func() (worker.BaseProcess, error) {
+func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
+ return func() (*worker.SyncWorkerImpl, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
@@ -286,10 +284,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return nil, err
}
- sw, err := syncWorker.From(w)
- if err != nil {
- return nil, err
- }
+ sw := worker.From(w)
sp.events.Push(events.PoolEvent{
Event: events.EventWorkerConstruct,
@@ -305,7 +300,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, err
}
- r, err := sw.(worker.SyncWorker).Exec(p)
+ r, err := sw.Exec(p)
if stopErr := sw.Stop(); stopErr != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
@@ -315,9 +310,9 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.SyncWorker, error) {
const op = errors.Op("allocate workers")
- var workers []worker.BaseProcess
+ var workers []worker.SyncWorker
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
@@ -326,11 +321,7 @@ func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, e
return nil, errors.E(op, errors.WorkerAllocate, err)
}
- sw, err := syncWorker.From(w)
- if err != nil {
- return nil, errors.E(op, err)
- }
- workers = append(workers, sw)
+ workers = append(workers, w)
}
return workers, nil
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 348f5297..a877b28f 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -12,10 +12,10 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/stretchr/testify/assert"
)
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 19cda759..2bae8f9e 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -6,11 +6,10 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/tools"
)
@@ -20,7 +19,7 @@ const MB = 1024 * 1024
const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
type Supervised interface {
- pool.Pool
+ Pool
// Start used to start watching process for all pool workers
Start()
}
@@ -28,12 +27,12 @@ type Supervised interface {
type supervised struct {
cfg *SupervisorConfig
events events.Handler
- pool pool.Pool
+ pool Pool
stopCh chan struct{}
mu *sync.RWMutex
}
-func supervisorWrapper(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
sp := &supervised{
cfg: cfg,
events: events,
@@ -101,13 +100,13 @@ func (sp *supervised) GetConfig() interface{} {
return sp.pool.GetConfig()
}
-func (sp *supervised) Workers() (workers []worker.BaseProcess) {
+func (sp *supervised) Workers() (workers []*worker.SyncWorkerImpl) {
sp.mu.Lock()
defer sp.mu.Unlock()
return sp.pool.Workers()
}
-func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
+func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error {
return sp.pool.RemoveWorker(worker)
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index a9424cd5..58f63b7e 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -7,7 +7,7 @@ import (
"time"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/spiral/roadrunner/v2/tools"
"github.com/stretchr/testify/assert"
)
diff --git a/interfaces/worker/factory.go b/pkg/transport/interface.go
index 376303df..299ac95f 100644
--- a/interfaces/worker/factory.go
+++ b/pkg/transport/interface.go
@@ -1,20 +1,21 @@
-package worker
+package transport
import (
"context"
"os/exec"
- "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
)
// Factory is responsible of wrapping given command into tasks WorkerProcess.
type Factory interface {
// SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
// Process must not be started.
- SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (BaseProcess, error)
+ SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error)
// SpawnWorker creates new WorkerProcess process based on given command.
// Process must not be started.
- SpawnWorker(*exec.Cmd, ...events.Listener) (BaseProcess, error)
+ SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error)
// Close the factory and underlying connections.
Close() error
}
diff --git a/pkg/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
index b656eff8..dd7c5841 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/transport/pipe/pipe_factory.go
@@ -6,10 +6,9 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/pipe"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
)
@@ -19,22 +18,22 @@ type Factory struct{}
// NewPipeFactory returns new factory instance and starts
// listening
-func NewPipeFactory() worker.Factory {
+func NewPipeFactory() *Factory {
return &Factory{}
}
type SpawnResult struct {
- w worker.BaseProcess
+ w *worker.Process
err error
}
// SpawnWorker creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
c := make(chan SpawnResult)
const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
c <- SpawnResult{
w: nil,
@@ -113,9 +112,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
const op = errors.Op("factory_spawn_worker")
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
return nil, errors.E(op, err)
}
diff --git a/pkg/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index 805a24ee..2e5bbcd5 100644
--- a/pkg/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -8,15 +8,15 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
func Test_GetState2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -28,14 +28,11 @@ func Test_GetState2(t *testing.T) {
assert.NotNil(t, w)
assert.Equal(t, internal.StateReady, w.State().Value())
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
+ assert.NoError(t, w.Stop())
}
func Test_Kill2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
wg := &sync.WaitGroup{}
@@ -58,7 +55,7 @@ func Test_Kill2(t *testing.T) {
}
func Test_Pipe_Start2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.NoError(t, err)
@@ -72,7 +69,7 @@ func Test_Pipe_Start2(t *testing.T) {
}
func Test_Pipe_StartError2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
err := cmd.Start()
if err != nil {
t.Errorf("error running the command: error %v", err)
@@ -84,7 +81,7 @@ func Test_Pipe_StartError2(t *testing.T) {
}
func Test_Pipe_PipeError3(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
_, err := cmd.StdinPipe()
if err != nil {
t.Errorf("error creating the STDIN pipe: error %v", err)
@@ -96,7 +93,7 @@ func Test_Pipe_PipeError3(t *testing.T) {
}
func Test_Pipe_PipeError4(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
_, err := cmd.StdinPipe()
if err != nil {
t.Errorf("error creating the STDIN pipe: error %v", err)
@@ -108,7 +105,7 @@ func Test_Pipe_PipeError4(t *testing.T) {
}
func Test_Pipe_Failboot2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Nil(t, w)
@@ -117,14 +114,14 @@ func Test_Pipe_Failboot2(t *testing.T) {
}
func Test_Pipe_Invalid2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
func Test_Pipe_Echo2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
@@ -136,10 +133,7 @@ func Test_Pipe_Echo2(t *testing.T) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -152,7 +146,7 @@ func Test_Pipe_Echo2(t *testing.T) {
}
func Test_Pipe_Broken2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
@@ -163,10 +157,7 @@ func Test_Pipe_Broken2(t *testing.T) {
assert.Error(t, err)
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -178,7 +169,7 @@ func Test_Pipe_Broken2(t *testing.T) {
func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
f := NewPipeFactory()
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := f.SpawnWorker(cmd)
go func() {
if w.Wait() != nil {
@@ -194,13 +185,11 @@ func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
+
b.ReportAllocs()
b.ResetTimer()
go func() {
@@ -224,7 +213,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
b.Fatal(err)
@@ -237,10 +226,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -250,7 +236,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
b.Fatal(err)
@@ -263,10 +249,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -276,28 +259,26 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
}
func Test_Echo2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
+
go func() {
- assert.NoError(t, syncWorker.Wait())
+ assert.NoError(t, sw.Wait())
}()
defer func() {
- err := syncWorker.Stop()
+ err := sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
}()
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -308,26 +289,23 @@ func Test_Echo2(t *testing.T) {
}
func Test_BadPayload2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
go func() {
- assert.NoError(t, syncWorker.Wait())
+ assert.NoError(t, sw.Wait())
}()
defer func() {
- err := syncWorker.Stop()
+ err := sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
}()
- res, err := syncWorker.Exec(payload.Payload{})
+ res, err := sw.Exec(payload.Payload{})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -337,7 +315,7 @@ func Test_BadPayload2(t *testing.T) {
}
func Test_String2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -350,13 +328,13 @@ func Test_String2(t *testing.T) {
}
}()
- assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes")
assert.Contains(t, w.String(), "ready")
assert.Contains(t, w.String(), "numExecs: 0")
}
func Test_Echo_Slow2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -369,12 +347,9 @@ func Test_Echo_Slow2(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -385,7 +360,7 @@ func Test_Echo_Slow2(t *testing.T) {
}
func Test_Broken2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
data := ""
mu := &sync.Mutex{}
listener := func(event interface{}) {
@@ -401,12 +376,9 @@ func Test_Broken2(t *testing.T) {
t.Fatal(err)
}
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -421,7 +393,7 @@ func Test_Broken2(t *testing.T) {
}
func Test_Error2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -435,12 +407,9 @@ func Test_Error2(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -452,7 +421,7 @@ func Test_Error2(t *testing.T) {
}
func Test_NumExecs2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -465,24 +434,21 @@ func Test_NumExecs2(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(1), w.State().NumExecs())
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(2), w.State().NumExecs())
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index a2731294..fa37ac0f 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -9,16 +9,16 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
func Test_GetState(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -38,7 +38,7 @@ func Test_GetState(t *testing.T) {
func Test_Kill(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
wg := &sync.WaitGroup{}
@@ -62,7 +62,7 @@ func Test_Kill(t *testing.T) {
func Test_Pipe_Start(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
@@ -76,7 +76,7 @@ func Test_Pipe_Start(t *testing.T) {
}
func Test_Pipe_StartError(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
err := cmd.Start()
if err != nil {
t.Errorf("error running the command: error %v", err)
@@ -89,7 +89,7 @@ func Test_Pipe_StartError(t *testing.T) {
}
func Test_Pipe_PipeError(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
_, err := cmd.StdinPipe()
if err != nil {
t.Errorf("error creating the STDIN pipe: error %v", err)
@@ -102,7 +102,7 @@ func Test_Pipe_PipeError(t *testing.T) {
}
func Test_Pipe_PipeError2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
_, err := cmd.StdinPipe()
if err != nil {
t.Errorf("error creating the STDIN pipe: error %v", err)
@@ -115,7 +115,7 @@ func Test_Pipe_PipeError2(t *testing.T) {
}
func Test_Pipe_Failboot(t *testing.T) {
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
@@ -125,7 +125,7 @@ func Test_Pipe_Failboot(t *testing.T) {
}
func Test_Pipe_Invalid(t *testing.T) {
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
@@ -133,7 +133,7 @@ func Test_Pipe_Invalid(t *testing.T) {
}
func Test_Pipe_Echo(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -146,10 +146,7 @@ func Test_Pipe_Echo(t *testing.T) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -162,7 +159,7 @@ func Test_Pipe_Echo(t *testing.T) {
}
func Test_Pipe_Broken(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -174,10 +171,7 @@ func Test_Pipe_Broken(t *testing.T) {
assert.Error(t, err)
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -189,7 +183,7 @@ func Test_Pipe_Broken(t *testing.T) {
func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
f := NewPipeFactory()
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd)
go func() {
if w.Wait() != nil {
@@ -205,13 +199,11 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd)
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
+
b.ReportAllocs()
b.ResetTimer()
go func() {
@@ -235,7 +227,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -249,10 +241,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -262,7 +251,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -276,10 +265,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -290,28 +276,25 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
func Test_Echo(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
go func() {
- assert.NoError(t, syncWorker.Wait())
+ assert.NoError(t, sw.Wait())
}()
defer func() {
- err := syncWorker.Stop()
+ err := sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
}()
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -323,26 +306,23 @@ func Test_Echo(t *testing.T) {
func Test_BadPayload(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
go func() {
- assert.NoError(t, syncWorker.Wait())
+ assert.NoError(t, sw.Wait())
}()
defer func() {
- err := syncWorker.Stop()
+ err := sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
}()
- res, err := syncWorker.Exec(payload.Payload{})
+ res, err := sw.Exec(payload.Payload{})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -353,7 +333,7 @@ func Test_BadPayload(t *testing.T) {
func Test_String(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -366,14 +346,14 @@ func Test_String(t *testing.T) {
}
}()
- assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes")
assert.Contains(t, w.String(), "ready")
assert.Contains(t, w.String(), "numExecs: 0")
}
func Test_Echo_Slow(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -386,12 +366,9 @@ func Test_Echo_Slow(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -403,7 +380,7 @@ func Test_Echo_Slow(t *testing.T) {
func Test_Broken(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
data := ""
mu := &sync.Mutex{}
listener := func(event interface{}) {
@@ -419,12 +396,9 @@ func Test_Broken(t *testing.T) {
t.Fatal(err)
}
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -440,7 +414,7 @@ func Test_Broken(t *testing.T) {
func Test_Error(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -454,12 +428,9 @@ func Test_Error(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -472,7 +443,7 @@ func Test_Error(t *testing.T) {
func Test_NumExecs(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -485,24 +456,21 @@ func Test_NumExecs(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(1), w.State().NumExecs())
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(2), w.State().NumExecs())
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index 8f99ff73..ccd2b0bf 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -11,10 +11,9 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/goridge/v3/pkg/socket"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
@@ -38,7 +37,7 @@ type Factory struct {
// NewSocketServer returns Factory attached to a given socket listener.
// tout specifies for how long factory should serve for incoming relay connection
-func NewSocketServer(ls net.Listener, tout time.Duration) worker.Factory {
+func NewSocketServer(ls net.Listener, tout time.Duration) *Factory {
f := &Factory{
ls: ls,
tout: tout,
@@ -80,18 +79,18 @@ func (f *Factory) listen() error {
}
type socketSpawn struct {
- w worker.BaseProcess
+ w *worker.Process
err error
}
// SpawnWorker creates Process and connects it to appropriate relay or returns error
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
const op = errors.Op("factory_spawn_worker_with_timeout")
c := make(chan socketSpawn)
go func() {
ctx, cancel := context.WithTimeout(ctx, f.tout)
defer cancel()
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
c <- socketSpawn{
w: nil,
@@ -145,9 +144,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
const op = errors.Op("factory_spawn_worker")
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
return nil, err
}
diff --git a/pkg/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index 2f21e408..0e29e7d2 100644
--- a/pkg/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -26,7 +26,7 @@ func Test_Tcp_Start2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
assert.NoError(t, err)
@@ -49,7 +49,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
f := NewSocketServer(ls, time.Minute)
defer func() {
@@ -82,7 +82,7 @@ func Test_Tcp_StartError2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
err = cmd.Start()
if err != nil {
t.Errorf("error executing the command: error %v", err)
@@ -106,7 +106,7 @@ func Test_Tcp_Failboot2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
@@ -127,7 +127,7 @@ func Test_Tcp_Invalid2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd)
assert.Error(t, err)
@@ -147,7 +147,7 @@ func Test_Tcp_Broken2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
@@ -169,10 +169,7 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Error(t, err2)
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
@@ -194,7 +191,7 @@ func Test_Tcp_Echo2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
go func() {
@@ -207,10 +204,7 @@ func Test_Tcp_Echo2(t *testing.T) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -230,7 +224,7 @@ func Test_Unix_Start2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
assert.NoError(t, err)
@@ -254,7 +248,7 @@ func Test_Unix_Failboot2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
@@ -270,7 +264,7 @@ func Test_Unix_Timeout2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0")
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0")
w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd)
assert.Nil(t, w)
@@ -286,7 +280,7 @@ func Test_Unix_Invalid2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd)
assert.Error(t, err)
@@ -301,7 +295,7 @@ func Test_Unix_Broken2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
@@ -322,10 +316,7 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Error(t, err)
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -343,7 +334,7 @@ func Test_Unix_Echo2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
@@ -359,10 +350,7 @@ func Test_Unix_Echo2(t *testing.T) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -384,7 +372,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) {
f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := f.SpawnWorker(cmd)
if err != nil {
@@ -409,7 +397,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
assert.NoError(b, err)
}()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
@@ -422,10 +410,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -452,7 +437,7 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) {
f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := f.SpawnWorker(cmd)
if err != nil {
@@ -481,7 +466,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
b.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
@@ -494,10 +479,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
diff --git a/pkg/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index 983f3e8e..f55fc3dd 100755
--- a/pkg/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -29,7 +29,7 @@ func Test_Tcp_Start(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
@@ -54,7 +54,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
f := NewSocketServer(ls, time.Minute)
defer func() {
@@ -89,7 +89,7 @@ func Test_Tcp_StartError(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
err = cmd.Start()
if err != nil {
t.Errorf("error executing the command: error %v", err)
@@ -116,7 +116,7 @@ func Test_Tcp_Failboot(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
@@ -139,7 +139,7 @@ func Test_Tcp_Timeout(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0")
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "tcp", "200", "0")
w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
@@ -162,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
@@ -184,7 +184,7 @@ func Test_Tcp_Broken(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -206,10 +206,7 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, err2)
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
@@ -233,7 +230,7 @@ func Test_Tcp_Echo(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -246,10 +243,7 @@ func Test_Tcp_Echo(t *testing.T) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -275,7 +269,7 @@ func Test_Unix_Start(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
@@ -305,7 +299,7 @@ func Test_Unix_Failboot(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
@@ -327,7 +321,7 @@ func Test_Unix_Timeout(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0")
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0")
w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
@@ -349,7 +343,7 @@ func Test_Unix_Invalid(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
@@ -370,7 +364,7 @@ func Test_Unix_Broken(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -391,10 +385,7 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -418,7 +409,7 @@ func Test_Unix_Echo(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -434,10 +425,7 @@ func Test_Unix_Echo(t *testing.T) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -465,7 +453,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -496,7 +484,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
b.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -509,10 +497,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -537,7 +522,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -564,7 +549,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
b.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -577,10 +562,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
diff --git a/interfaces/worker/worker.go b/pkg/worker/interface.go
index 0ac82158..9d74ae10 100644
--- a/interfaces/worker/worker.go
+++ b/pkg/worker/interface.go
@@ -10,9 +10,6 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
)
-// Allocator is responsible for worker allocation in the pool
-type Allocator func() (BaseProcess, error)
-
type BaseProcess interface {
fmt.Stringer
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 8314c039..1a0393fb 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -8,50 +8,67 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
"go.uber.org/multierr"
)
-type syncWorker struct {
- w worker.BaseProcess
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (*SyncWorkerImpl, error)
+
+type SyncWorkerImpl struct {
+ process *Process
}
// From creates SyncWorker from BaseProcess
-func From(w worker.BaseProcess) (worker.SyncWorker, error) {
- return &syncWorker{
- w: w,
- }, nil
+func From(process *Process) *SyncWorkerImpl {
+ return &SyncWorkerImpl{
+ process: process,
+ }
+}
+
+// FromSync creates BaseProcess from SyncWorkerImpl
+func FromSync(w *SyncWorkerImpl) BaseProcess {
+ return &Process{
+ created: w.process.created,
+ events: w.process.events,
+ state: w.process.state,
+ cmd: w.process.cmd,
+ pid: w.process.pid,
+ stderr: w.process.stderr,
+ endState: w.process.endState,
+ relay: w.process.relay,
+ rd: w.process.rd,
+ }
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.w.State().Value() != internal.StateReady {
- return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
+ if tw.process.State().Value() != internal.StateReady {
+ return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(internal.StateWorking)
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(internal.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.w.State().Set(internal.StateErrored)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateErrored)
+ tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.w.State().Set(internal.StateReady)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateReady)
+ tw.process.State().RegisterExec()
return rsp, nil
}
@@ -62,7 +79,7 @@ type wexec struct {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
@@ -75,24 +92,24 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
return
}
- if tw.w.State().Value() != internal.StateReady {
+ if tw.process.State().Value() != internal.StateReady {
c <- wexec{
payload: payload.Payload{},
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())),
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
}
return
}
// set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(internal.StateWorking)
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(internal.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.w.State().Set(internal.StateErrored)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateErrored)
+ tw.process.State().RegisterExec()
}
c <- wexec{
payload: payload.Payload{},
@@ -101,8 +118,8 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
return
}
- tw.w.State().Set(internal.StateReady)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateReady)
+ tw.process.State().RegisterExec()
c <- wexec{
payload: rsp,
@@ -128,7 +145,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
}
}
-func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
fr := frame.NewFrame()
@@ -156,7 +173,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
frameR := frame.NewFrame()
- err = tw.w.Relay().Receive(frameR)
+ err = tw.process.Relay().Receive(frameR)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -186,42 +203,42 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
return pl, nil
}
-func (tw *syncWorker) String() string {
- return tw.w.String()
+func (tw *SyncWorkerImpl) String() string {
+ return tw.process.String()
}
-func (tw *syncWorker) Pid() int64 {
- return tw.w.Pid()
+func (tw *SyncWorkerImpl) Pid() int64 {
+ return tw.process.Pid()
}
-func (tw *syncWorker) Created() time.Time {
- return tw.w.Created()
+func (tw *SyncWorkerImpl) Created() time.Time {
+ return tw.process.Created()
}
-func (tw *syncWorker) State() internal.State {
- return tw.w.State()
+func (tw *SyncWorkerImpl) State() internal.State {
+ return tw.process.State()
}
-func (tw *syncWorker) Start() error {
- return tw.w.Start()
+func (tw *SyncWorkerImpl) Start() error {
+ return tw.process.Start()
}
-func (tw *syncWorker) Wait() error {
- return tw.w.Wait()
+func (tw *SyncWorkerImpl) Wait() error {
+ return tw.process.Wait()
}
-func (tw *syncWorker) Stop() error {
- return tw.w.Stop()
+func (tw *SyncWorkerImpl) Stop() error {
+ return tw.process.Stop()
}
-func (tw *syncWorker) Kill() error {
- return tw.w.Kill()
+func (tw *SyncWorkerImpl) Kill() error {
+ return tw.process.Kill()
}
-func (tw *syncWorker) Relay() relay.Relay {
- return tw.w.Relay()
+func (tw *SyncWorkerImpl) Relay() relay.Relay {
+ return tw.process.Relay()
}
-func (tw *syncWorker) AttachRelay(rl relay.Relay) {
- tw.w.AttachRelay(rl)
+func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
+ tw.process.AttachRelay(rl)
}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
index 40988b06..df556e93 100755
--- a/pkg/worker/sync_worker_test.go
+++ b/pkg/worker/sync_worker_test.go
@@ -22,12 +22,9 @@ func Test_NotStarted_Exec(t *testing.T) {
w, _ := InitBaseWorker(cmd)
- syncWorker, err := From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index bf70d646..8fd71cca 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -13,10 +13,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/interfaces/relay"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"go.uber.org/multierr"
)
@@ -78,14 +76,14 @@ type Process struct {
}
// InitBaseWorker creates new Process over given exec.cmd.
-func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, error) {
+func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
const op = errors.Op("init_base_worker")
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
w := &Process{
created: time.Now(),
- events: eventsPkg.NewEventsHandler(),
+ events: events.NewEventsHandler(),
cmd: cmd,
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),
@@ -198,7 +196,7 @@ func (w *Process) Wait() error {
// at this point according to the documentation (see cmd.Wait comment)
// if worker finishes with an error, message will be written to the stderr first
- // and then w.cmd.Wait return an error
+ // and then process.cmd.Wait return an error
w.endState = w.cmd.ProcessState
if err != nil {
w.state.Set(internal.StateErrored)
diff --git a/interfaces/worker/watcher.go b/pkg/worker_watcher/interface.go
index ce2c1c5a..13991541 100644
--- a/interfaces/worker/watcher.go
+++ b/pkg/worker_watcher/interface.go
@@ -1,16 +1,20 @@
-package worker
+package worker_watcher //nolint:golint,stylecheck
-import "context"
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
type Watcher interface {
// AddToWatch used to add stack to wait its state
- AddToWatch(workers []BaseProcess) error
+ AddToWatch(workers []worker.SyncWorker) error
// GetFreeWorker provide first free worker
- GetFreeWorker(ctx context.Context) (BaseProcess, error)
+ GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error)
// PutWorker enqueues worker back
- PushWorker(w BaseProcess)
+ PushWorker(w worker.SyncWorker)
// AllocateNew used to allocate new worker and put in into the WorkerWatcher
AllocateNew() error
@@ -19,8 +23,8 @@ type Watcher interface {
Destroy(ctx context.Context)
// WorkersList return all stack w/o removing it from internal storage
- WorkersList() []BaseProcess
+ WorkersList() []*worker.SyncWorkerImpl
// RemoveWorker remove worker from the stack
- RemoveWorker(wb BaseProcess) error
+ RemoveWorker(wb worker.SyncWorker) error
}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index c87e8b65..2d23d0e9 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -5,12 +5,12 @@ import (
"sync"
"time"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
- workers []worker.BaseProcess
+ workers []*worker.SyncWorkerImpl
mutex sync.RWMutex
destroy bool
actualNumOfWorkers uint64
@@ -20,7 +20,7 @@ type Stack struct {
func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
w := runtime.NumCPU()
return &Stack{
- workers: make([]worker.BaseProcess, 0, w),
+ workers: make([]*worker.SyncWorkerImpl, 0, w),
actualNumOfWorkers: 0,
initialNumOfWorkers: initialNumOfWorkers,
}
@@ -39,7 +39,7 @@ func (stack *Stack) Push(w worker.BaseProcess) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
stack.actualNumOfWorkers++
- stack.workers = append(stack.workers, w)
+ stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl))
}
func (stack *Stack) IsEmpty() bool {
@@ -48,7 +48,7 @@ func (stack *Stack) IsEmpty() bool {
return len(stack.workers) == 0
}
-func (stack *Stack) Pop() (worker.BaseProcess, bool) {
+func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
@@ -85,13 +85,15 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
}
// Workers return copy of the workers in the stack
-func (stack *Stack) Workers() []worker.BaseProcess {
+func (stack *Stack) Workers() []*worker.SyncWorkerImpl {
stack.mutex.Lock()
defer stack.mutex.Unlock()
- workersCopy := make([]worker.BaseProcess, 0, 1)
+ workersCopy := make([]*worker.SyncWorkerImpl, 0, 1)
// copy
for _, v := range stack.workers {
- workersCopy = append(workersCopy, v)
+ if v != nil {
+ workersCopy = append(workersCopy, v)
+ }
}
return workersCopy
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go
index 86af2043..5287a6dc 100644
--- a/pkg/worker_watcher/stack_test.go
+++ b/pkg/worker_watcher/stack_test.go
@@ -5,24 +5,25 @@ import (
"testing"
"time"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
func TestNewWorkersStack(t *testing.T) {
stack := NewWorkersStack(0)
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
- assert.Equal(t, []worker.BaseProcess{}, stack.workers)
+ assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers)
}
func TestStack_Push(t *testing.T) {
stack := NewWorkersStack(1)
- w, err := workerImpl.InitBaseWorker(&exec.Cmd{})
+ w, err := worker.InitBaseWorker(&exec.Cmd{})
assert.NoError(t, err)
- stack.Push(w)
+ sw := worker.From(w)
+
+ stack.Push(sw)
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
}
@@ -30,10 +31,12 @@ func TestStack_Pop(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
- stack.Push(w)
+ sw := worker.From(w)
+
+ stack.Push(sw)
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
_, _ = stack.Pop()
@@ -43,12 +46,14 @@ func TestStack_Pop(t *testing.T) {
func TestStack_FindAndRemoveByPid(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+
+ stack.Push(sw)
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
stack.FindAndRemoveByPid(w.Pid())
@@ -59,10 +64,12 @@ func TestStack_IsEmpty(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
assert.Equal(t, false, stack.IsEmpty())
@@ -71,11 +78,12 @@ func TestStack_IsEmpty(t *testing.T) {
func TestStack_Workers(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
wrks := stack.Workers()
assert.Equal(t, 1, len(wrks))
@@ -85,11 +93,13 @@ func TestStack_Workers(t *testing.T) {
func TestStack_Reset(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
stack.Reset()
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
@@ -98,11 +108,13 @@ func TestStack_Reset(t *testing.T) {
func TestStack_Destroy(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+
stack.Destroy(context.Background())
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
}
@@ -110,12 +122,13 @@ func TestStack_Destroy(t *testing.T) {
func TestStack_DestroyWithWait(t *testing.T) {
stack := NewWorkersStack(2)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+ stack.Push(sw)
assert.Equal(t, uint64(2), stack.actualNumOfWorkers)
go func() {
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index b0d39165..f87bd021 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -5,13 +5,13 @@ import (
"sync"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
)
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher {
ww := &workerWatcher{
stack: NewWorkersStack(uint64(numWorkers)),
allocator: allocator,
@@ -28,18 +28,18 @@ type workerWatcher struct {
events events.Handler
}
-func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
+func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
for i := 0; i < len(workers); i++ {
ww.stack.Push(workers[i])
- go func(swc worker.BaseProcess) {
+ go func(swc worker.SyncWorker) {
ww.wait(swc)
}(workers[i])
}
return nil
}
-func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) {
+func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
w, stop := ww.stack.Pop()
@@ -94,7 +94,7 @@ func (ww *workerWatcher) AllocateNew() error {
return nil
}
-func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
+func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
@@ -114,7 +114,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
}
// O(1) operation
-func (ww *workerWatcher) PushWorker(w worker.BaseProcess) {
+func (ww *workerWatcher) PushWorker(w worker.SyncWorker) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
ww.stack.Push(w)
@@ -127,11 +127,11 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) WorkersList() []worker.BaseProcess {
+func (ww *workerWatcher) WorkersList() []*worker.SyncWorkerImpl {
return ww.stack.Workers()
}
-func (ww *workerWatcher) wait(w worker.BaseProcess) {
+func (ww *workerWatcher) wait(w worker.SyncWorker) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
@@ -158,7 +158,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
}
}
-func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
+func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) {
go func() {
ww.wait(wb)
}()
diff --git a/plugins/http/handler.go b/plugins/http/handler.go
index ecdcb2c0..0e7481b5 100644
--- a/plugins/http/handler.go
+++ b/plugins/http/handler.go
@@ -10,8 +10,8 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/http/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index b1c68d89..144148af 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -15,9 +15,8 @@ import (
"github.com/hashicorp/go-multierror"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/checker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -104,7 +103,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
s.cfg.Env[RR_HTTP] = "true"
- s.pool, err = server.NewWorkerPool(context.Background(), poolImpl.Config{
+ s.pool, err = server.NewWorkerPool(context.Background(), pool.Config{
Debug: s.cfg.Pool.Debug,
NumWorkers: s.cfg.Pool.NumWorkers,
MaxJobs: s.cfg.Pool.MaxJobs,
@@ -304,7 +303,12 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Workers returns associated pool workers
func (s *Plugin) Workers() []worker.BaseProcess {
- return s.pool.Workers()
+ workers := s.pool.Workers()
+ baseWorkers := make([]worker.BaseProcess, 0, len(workers))
+ for i := 0; i < len(workers); i++ {
+ baseWorkers = append(baseWorkers, worker.FromSync(workers[i]))
+ }
+ return baseWorkers
}
// Name returns endure.Named interface implementation
@@ -322,7 +326,7 @@ func (s *Plugin) Reset() error {
s.pool = nil
var err error
- s.pool, err = s.server.NewWorkerPool(context.Background(), poolImpl.Config{
+ s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{
Debug: s.cfg.Pool.Debug,
NumWorkers: s.cfg.Pool.NumWorkers,
MaxJobs: s.cfg.Pool.MaxJobs,
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index 27139ae1..8e3b922b 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -1,6 +1,6 @@
package informer
-import "github.com/spiral/roadrunner/v2/interfaces/worker"
+import "github.com/spiral/roadrunner/v2/pkg/worker"
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index 7200c51f..416c0112 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -3,7 +3,7 @@ package informer
import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 98b5681c..c036ae96 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -1,7 +1,7 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/tools"
)
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
index a2d8b92b..fe04b85b 100644
--- a/plugins/server/interface.go
+++ b/plugins/server/interface.go
@@ -4,10 +4,10 @@ import (
"context"
"os/exec"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
)
// Env variables type alias
@@ -16,6 +16,6 @@ type Env map[string]string
// Server creates workers for the application.
type Server interface {
CmdFactory(env Env) (func() *exec.Cmd, error)
- NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error)
+ NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error)
NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 0c9c49ea..16123786 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -8,16 +8,17 @@ import (
"strings"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/transport"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
// core imports
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/socket"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/transport/socket"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -33,7 +34,7 @@ const RR_RPC = "RR_RPC" //nolint:golint,stylecheck
type Plugin struct {
cfg Config
log logger.Logger
- factory worker.Factory
+ factory transport.Factory
}
// Init application provider.
@@ -115,7 +116,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
}
// NewWorker issues new standalone worker.
-func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) {
+func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) {
const op = errors.Op("server_plugin_new_worker")
list := make([]events.Listener, 0, len(listeners))
@@ -157,7 +158,7 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, en
}
// creates relay and worker factory.
-func (server *Plugin) initFactory() (worker.Factory, error) {
+func (server *Plugin) initFactory() (transport.Factory, error) {
const op = errors.Op("server_plugin_init_factory")
if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" {
return pipe.NewPipeFactory(), nil
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index ee6f795d..45931a49 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -10,8 +10,8 @@ import (
"runtime"
"strings"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
- poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/http/config"
"github.com/stretchr/testify/assert"
@@ -23,10 +23,10 @@ import (
)
func TestHandler_Echo(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -38,7 +38,7 @@ func TestHandler_Echo(t *testing.T) {
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8177", Handler: h}
@@ -74,10 +74,10 @@ func Test_HandlerErrors(t *testing.T) {
}
func TestHandler_Headers(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "header", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -86,13 +86,13 @@ func TestHandler_Headers(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8078", Handler: h}
@@ -135,10 +135,10 @@ func TestHandler_Headers(t *testing.T) {
}
func TestHandler_Empty_User_Agent(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -147,13 +147,13 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8088", Handler: h}
@@ -195,10 +195,10 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
}
func TestHandler_User_Agent(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -207,13 +207,13 @@ func TestHandler_User_Agent(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8088", Handler: h}
@@ -255,10 +255,10 @@ func TestHandler_User_Agent(t *testing.T) {
}
func TestHandler_Cookies(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "cookie", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -267,13 +267,13 @@ func TestHandler_Cookies(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8079", Handler: h}
@@ -320,10 +320,10 @@ func TestHandler_Cookies(t *testing.T) {
}
func TestHandler_JsonPayload_POST(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -332,13 +332,13 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8090", Handler: h}
@@ -384,10 +384,10 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
}
func TestHandler_JsonPayload_PUT(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -396,13 +396,13 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8081", Handler: h}
@@ -444,10 +444,10 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
}
func TestHandler_JsonPayload_PATCH(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -456,13 +456,13 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8082", Handler: h}
@@ -504,10 +504,10 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
}
func TestHandler_FormData_POST(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -516,13 +516,13 @@ func TestHandler_FormData_POST(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8083", Handler: h}
@@ -577,10 +577,10 @@ func TestHandler_FormData_POST(t *testing.T) {
}
func TestHandler_FormData_POST_Overwrite(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -589,13 +589,13 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8083", Handler: h}
@@ -650,10 +650,10 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
}
func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -662,13 +662,13 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8083", Handler: h}
@@ -722,10 +722,10 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
}
func TestHandler_FormData_PUT(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -734,13 +734,13 @@ func TestHandler_FormData_PUT(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":17834", Handler: h}
@@ -794,10 +794,10 @@ func TestHandler_FormData_PUT(t *testing.T) {
}
func TestHandler_FormData_PATCH(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -806,13 +806,13 @@ func TestHandler_FormData_PATCH(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8085", Handler: h}
@@ -866,10 +866,10 @@ func TestHandler_FormData_PATCH(t *testing.T) {
}
func TestHandler_Multipart_POST(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -878,13 +878,13 @@ func TestHandler_Multipart_POST(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8019", Handler: h}
@@ -980,10 +980,10 @@ func TestHandler_Multipart_POST(t *testing.T) {
}
func TestHandler_Multipart_PUT(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -992,13 +992,13 @@ func TestHandler_Multipart_PUT(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8020", Handler: h}
@@ -1094,10 +1094,10 @@ func TestHandler_Multipart_PUT(t *testing.T) {
}
func TestHandler_Multipart_PATCH(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1106,13 +1106,13 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8021", Handler: h}
@@ -1210,10 +1210,10 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
}
func TestHandler_Error(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1222,13 +1222,13 @@ func TestHandler_Error(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8177", Handler: h}
@@ -1256,10 +1256,10 @@ func TestHandler_Error(t *testing.T) {
}
func TestHandler_Error2(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error2", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1268,13 +1268,13 @@ func TestHandler_Error2(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8177", Handler: h}
@@ -1302,10 +1302,10 @@ func TestHandler_Error2(t *testing.T) {
}
func TestHandler_Error3(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1314,13 +1314,13 @@ func TestHandler_Error3(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8177", Handler: h}
@@ -1361,10 +1361,10 @@ func TestHandler_Error3(t *testing.T) {
}
func TestHandler_ResponseDuration(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1373,13 +1373,13 @@ func TestHandler_ResponseDuration(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8177", Handler: h}
@@ -1422,10 +1422,10 @@ func TestHandler_ResponseDuration(t *testing.T) {
}
func TestHandler_ResponseDurationDelayed(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echoDelay", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1434,13 +1434,13 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8177", Handler: h}
@@ -1482,10 +1482,10 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
}
func TestHandler_ErrorDuration(t *testing.T) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1494,13 +1494,13 @@ func TestHandler_ErrorDuration(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(t, err)
hs := &http.Server{Addr: ":8177", Handler: h}
@@ -1556,10 +1556,10 @@ func TestHandler_IP(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1568,13 +1568,13 @@ func TestHandler_IP(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, cidrs, pool)
+ }, cidrs, p)
assert.NoError(t, err)
hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
@@ -1617,10 +1617,10 @@ func TestHandler_XRealIP(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1629,13 +1629,13 @@ func TestHandler_XRealIP(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, cidrs, pool)
+ }, cidrs, p)
assert.NoError(t, err)
hs := &http.Server{Addr: "127.0.0.1:8179", Handler: h}
@@ -1683,10 +1683,10 @@ func TestHandler_XForwardedFor(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1695,13 +1695,13 @@ func TestHandler_XForwardedFor(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, cidrs, pool)
+ }, cidrs, p)
assert.NoError(t, err)
hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
@@ -1748,10 +1748,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, cidrs)
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1760,13 +1760,13 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
t.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, cidrs, pool)
+ }, cidrs, p)
assert.NoError(t, err)
hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
@@ -1796,10 +1796,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
}
func BenchmarkHandler_Listen_Echo(b *testing.B) {
- pool, err := poolImpl.Initialize(context.Background(),
+ p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ pool.Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1808,13 +1808,13 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
b.Fatal(err)
}
defer func() {
- pool.Destroy(context.Background())
+ p.Destroy(context.Background())
}()
h, err := httpPlugin.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
- }, nil, pool)
+ }, nil, p)
assert.NoError(b, err)
hs := &http.Server{Addr: ":8177", Handler: h}
diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go
index c530f7d1..26b28165 100644
--- a/tests/plugins/http/http_plugin_test.go
+++ b/tests/plugins/http/http_plugin_test.go
@@ -19,7 +19,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/logger"
diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go
index e03638d2..dd986902 100644
--- a/tests/plugins/http/uploads_test.go
+++ b/tests/plugins/http/uploads_test.go
@@ -16,8 +16,8 @@ import (
"time"
j "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/http/config"
"github.com/stretchr/testify/assert"
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index ba281d02..2e5af988 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -4,18 +4,18 @@ import (
"context"
"time"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/server"
)
-var testPoolConfig = poolImpl.Config{
+var testPoolConfig = pool.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- Supervisor: &poolImpl.SupervisorConfig{
+ Supervisor: &pool.SupervisorConfig{
WatchTick: 60,
TTL: 1000,
IdleTTL: 10,
@@ -50,10 +50,16 @@ func (p1 *Plugin1) Name() string {
}
func (p1 *Plugin1) Workers() []worker.BaseProcess {
- pool, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil)
+ p, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil)
if err != nil {
panic(err)
}
- return pool.Workers()
+ workers := p.Workers()
+ baseWorkers := make([]worker.BaseProcess, 0, len(workers))
+ for i := 0; i < len(workers); i++ {
+ baseWorkers = append(baseWorkers, worker.FromSync(workers[i]))
+ }
+
+ return baseWorkers
}
diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go
index 5eb2fed1..f53b228f 100644
--- a/tests/plugins/server/plugin_pipes.go
+++ b/tests/plugins/server/plugin_pipes.go
@@ -5,8 +5,8 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -80,11 +80,7 @@ func (f *Foo) Serve() chan error {
}
// test that our worker is functional
- sw, err := worker.From(w)
- if err != nil {
- errCh <- err
- return errCh
- }
+ sw := worker.From(w)
rsp, err := sw.Exec(r)
if err != nil {
diff --git a/tests/plugins/server/plugin_sockets.go b/tests/plugins/server/plugin_sockets.go
index ede67ded..0b2857e3 100644
--- a/tests/plugins/server/plugin_sockets.go
+++ b/tests/plugins/server/plugin_sockets.go
@@ -4,8 +4,8 @@ import (
"context"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -60,11 +60,7 @@ func (f *Foo2) Serve() chan error {
}
// test that our worker is functional
- sw, err := worker.From(w)
- if err != nil {
- errCh <- err
- return errCh
- }
+ sw := worker.From(w)
rsp, err := sw.Exec(r)
if err != nil {
diff --git a/tests/plugins/server/plugin_tcp.go b/tests/plugins/server/plugin_tcp.go
index 98c13b2b..ef4cea39 100644
--- a/tests/plugins/server/plugin_tcp.go
+++ b/tests/plugins/server/plugin_tcp.go
@@ -4,8 +4,8 @@ import (
"context"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -60,11 +60,7 @@ func (f *Foo3) Serve() chan error {
}
// test that our worker is functional
- sw, err := worker.From(w)
- if err != nil {
- errCh <- err
- return errCh
- }
+ sw := worker.From(w)
rsp, err := sw.Exec(r)
if err != nil {
diff --git a/tools/process.go b/tools/process.go
index 50fe1616..a01f2b24 100644
--- a/tools/process.go
+++ b/tools/process.go
@@ -3,7 +3,7 @@ package tools
import (
"github.com/shirou/gopsutil/process"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
)
// ProcessState provides information about specific worker.