summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
committerValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
commit7fb3cc3588cfde9260a6bb431330ce1e0a71f56d (patch)
tree3200cf2136f7413a7e1cfc6ecdaa83716f9655f9 /pkg
parentee5d34abde7f3931bf939498eb7a8cb170232f4f (diff)
interfaces folder deprecated
Diffstat (limited to 'pkg')
-rw-r--r--pkg/doc/pool_workflow.drawio1
-rw-r--r--pkg/doc/workflow.drawio1
-rwxr-xr-xpkg/events/general.go (renamed from pkg/events/events.go)10
-rw-r--r--pkg/events/interface.go14
-rw-r--r--pkg/events/pool_events.go66
-rw-r--r--pkg/events/worker_events.go33
-rw-r--r--pkg/pool/config.go10
-rw-r--r--pkg/pool/interface.go29
-rwxr-xr-xpkg/pool/static_pool.go73
-rwxr-xr-xpkg/pool/static_pool_test.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go15
-rw-r--r--pkg/pool/supervisor_test.go2
-rw-r--r--pkg/transport/interface.go21
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go (renamed from pkg/pipe/pipe_factory.go)17
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go (renamed from pkg/pipe/pipe_factory_spawn_test.go)134
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go (renamed from pkg/pipe/pipe_factory_test.go)128
-rwxr-xr-xpkg/transport/socket/socket_factory.go (renamed from pkg/socket/socket_factory.go)17
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go (renamed from pkg/socket/socket_factory_spawn_test.go)64
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go (renamed from pkg/socket/socket_factory_test.go)66
-rw-r--r--pkg/worker/interface.go56
-rwxr-xr-xpkg/worker/sync_worker.go111
-rwxr-xr-xpkg/worker/sync_worker_test.go7
-rwxr-xr-xpkg/worker/worker.go10
-rw-r--r--pkg/worker_watcher/interface.go30
-rw-r--r--pkg/worker_watcher/stack.go18
-rw-r--r--pkg/worker_watcher/stack_test.go53
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go22
27 files changed, 587 insertions, 425 deletions
diff --git a/pkg/doc/pool_workflow.drawio b/pkg/doc/pool_workflow.drawio
new file mode 100644
index 00000000..fd78d5a5
--- /dev/null
+++ b/pkg/doc/pool_workflow.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-01-23T19:14:50.556Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="3_IXXh0-4hROHlpEF-EL" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">jZJNT4QwEIZ/TY8mQCWy10XUxJiNclg9NnSWNimUdIuAv95iW6Ahm8iBzDzz0enbQThvxmdFOvYmKQiURHRE+BElySHOzH8GkwUZxhbUilOL4hWU/AccjBztOYVrkKilFJp3Iaxk20KlA0aUkkOYdpEiPLUjNexAWRGxp2dONfPTmW+NvACvmd6FGuLzHbgyQuWwQbhAOFdSams1Yw5ils9LY+uebkSX2RS0+j8FnylJH75O9cdrkZ3O6XF8Z5c71+WbiN7d2Q2rJy+Ckn1LYW4SIXwcGNdQdqSao4N5dcOYboTxYmO6dqA0jDfnjJfbm8UB2YBWk0nxBV4wtzQH5w7BC1jGNtrfO0bco9dL51UUYzhdvLvq/xfb7DEufgE=</diagram></mxfile> \ No newline at end of file
diff --git a/pkg/doc/workflow.drawio b/pkg/doc/workflow.drawio
new file mode 100644
index 00000000..d32d7b2d
--- /dev/null
+++ b/pkg/doc/workflow.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-01-23T19:13:52.763Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="OK65Xj3LZBjWN0qK7Y8T" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">jZJNT4QwEIZ/TY8mQCWy10XUxJiNclg9NnSWNimUdIuAv95iW6Ahm8iBzDzz0enbQThvxmdFOvYmKQiURHRE+BElySHOzH8GkwUZxhbUilOL4hWU/AccjBztOYVrkKilFJp3Iaxk20KlA0aUkkOYdpEiPLUjNexAWRGxp2dONfPTmW+NvACvmd6FGuLzHbgyQuWwQbhAOFdSams1Yw5ils9LY+uebkSX2RS0+j8FnylJH75O9cdrkZ3O6XF8Z5c71+WbiN7d2Q2rJy+Ckn1LYW4SIXwcGNdQdqSao4N5dcOYboTxYmO6dqA0jDfnjJfbm8UB2YBWk0nxBV4wtzQH5w7BC1jGNtrfO0bco9dL51UUYzhdvLvq/xfb7DEufgE=</diagram></mxfile> \ No newline at end of file
diff --git a/pkg/events/events.go b/pkg/events/general.go
index 226a0c91..a09a8759 100755
--- a/pkg/events/events.go
+++ b/pkg/events/general.go
@@ -2,18 +2,16 @@ package events
import (
"sync"
-
- "github.com/spiral/roadrunner/v2/interfaces/events"
)
// HandlerImpl helps to broadcast events to multiple listeners.
type HandlerImpl struct {
- listeners []events.Listener
+ listeners []Listener
sync.RWMutex // all receivers should be pointers
}
-func NewEventsHandler() events.Handler {
- return &HandlerImpl{listeners: make([]events.Listener, 0, 2)}
+func NewEventsHandler() Handler {
+ return &HandlerImpl{listeners: make([]Listener, 0, 2)}
}
// NumListeners returns number of event listeners.
@@ -24,7 +22,7 @@ func (eb *HandlerImpl) NumListeners() int {
}
// AddListener registers new event listener.
-func (eb *HandlerImpl) AddListener(listener events.Listener) {
+func (eb *HandlerImpl) AddListener(listener Listener) {
eb.Lock()
defer eb.Unlock()
eb.listeners = append(eb.listeners, listener)
diff --git a/pkg/events/interface.go b/pkg/events/interface.go
new file mode 100644
index 00000000..ac6c15a4
--- /dev/null
+++ b/pkg/events/interface.go
@@ -0,0 +1,14 @@
+package events
+
+// Handler interface
+type Handler interface {
+ // Return number of active listeners
+ NumListeners() int
+ // AddListener adds lister to the publisher
+ AddListener(listener Listener)
+ // Push pushes event to the listeners
+ Push(e interface{})
+}
+
+// Event listener listens for the events produced by worker, worker pool or other service.
+type Listener func(event interface{})
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
new file mode 100644
index 00000000..2cc76eee
--- /dev/null
+++ b/pkg/events/pool_events.go
@@ -0,0 +1,66 @@
+package events
+
+// TODO event numbers
+const (
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct P = iota + 7800
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventPoolError caused on pool wide errors.
+ EventPoolError
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+)
+
+type P int64
+
+func (ev P) String() string {
+ switch ev {
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventPoolError:
+ return "EventPoolError"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ }
+ return "Unknown event type"
+}
+
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event P
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
new file mode 100644
index 00000000..2bff1811
--- /dev/null
+++ b/pkg/events/worker_events.go
@@ -0,0 +1,33 @@
+package events
+
+const (
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError W = iota + 200
+
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+)
+
+type W int64
+
+func (ev W) String() string {
+ switch ev {
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ }
+ return "Unknown event type"
+}
+
+// WorkerEvent wraps worker events.
+type WorkerEvent struct {
+ // Event id, see below.
+ Event W
+
+ // Worker triggered the event.
+ Worker interface{}
+
+ // Event specific payload.
+ Payload interface{}
+}
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index e3e2d3cd..cf4aaaee 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -52,19 +52,19 @@ func (cfg *Config) InitDefaults() {
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
- WatchTick uint64 `mapstructure:"watch_tick"`
+ WatchTick uint64
// TTL defines maximum time worker is allowed to live.
- TTL uint64 `mapstructure:"ttl"`
+ TTL uint64
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL uint64 `mapstructure:"idle_ttl"`
+ IdleTTL uint64
// ExecTTL defines maximum lifetime per job.
- ExecTTL uint64 `mapstructure:"exec_ttl"`
+ ExecTTL uint64
// MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
+ MaxWorkerMemory uint64
}
// InitDefaults enables default config values.
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
new file mode 100644
index 00000000..f3fe4065
--- /dev/null
+++ b/pkg/pool/interface.go
@@ -0,0 +1,29 @@
+package pool
+
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+// Pool managed set of inner worker processes.
+type Pool interface {
+ // GetConfig returns pool configuration.
+ GetConfig() interface{}
+
+ // Exec executes task with payload
+ Exec(rqs payload.Payload) (payload.Payload, error)
+
+ // ExecWithContext executes task with context which is used with timeout
+ ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
+
+ // Workers returns worker list associated with the pool.
+ Workers() (workers []*worker.SyncWorkerImpl)
+
+ // Remove worker from the pool.
+ RemoveWorker(worker worker.SyncWorker) error
+
+ // Destroy all underlying stack (but let them to complete the task).
+ Destroy(ctx context.Context)
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index d1b726c1..bb416b29 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -6,13 +6,11 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/transport"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
)
@@ -20,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error)
type Options func(p *StaticPool)
@@ -34,7 +32,7 @@ type StaticPool struct {
cmd Command
// creates and connects to stack
- factory worker.Factory
+ factory transport.Factory
// distributes the events
events events.Handler
@@ -43,7 +41,7 @@ type StaticPool struct {
listeners []events.Listener
// manages worker states and TTLs
- ww worker.Watcher
+ ww workerWatcher.Watcher
// allocate new worker
allocator worker.Allocator
@@ -53,7 +51,7 @@ type StaticPool struct {
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) {
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
const op = errors.Op("static_pool_initialize")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
@@ -69,7 +67,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
cfg: cfg,
cmd: cmd,
factory: factory,
- events: eventsPkg.NewEventsHandler(),
+ events: events.NewEventsHandler(),
}
// add pool options
@@ -78,7 +76,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
}
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
- p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
@@ -124,11 +122,11 @@ func (sp *StaticPool) GetConfig() interface{} {
}
// Workers returns worker list associated with the pool.
-func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
+func (sp *StaticPool) Workers() (workers []*worker.SyncWorkerImpl) {
return sp.ww.WorkersList()
}
-func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
+func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
return sp.ww.RemoveWorker(wb)
}
@@ -153,12 +151,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// worker want's to be terminated
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
- sp.stopWorker(&w)
+ sp.stopWorker(w)
return sp.Exec(p)
}
- err = sp.checkMaxJobs(&w)
+ err = sp.checkMaxJobs(w)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -183,11 +181,11 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
// worker want's to be terminated
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
- sp.stopWorker(&w)
+ sp.stopWorker(w)
return sp.ExecWithContext(ctx, p)
}
- err = sp.checkMaxJobs(&w)
+ err = sp.checkMaxJobs(w)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -195,30 +193,30 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
return rsp, nil
}
-func (sp *StaticPool) stopWorker(w *worker.SyncWorker) {
+func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
const op = errors.Op("static_pool_stop_worker")
- (*w).State().Set(internal.StateInvalid)
- err := (*w).Stop()
+ w.State().Set(internal.StateInvalid)
+ err := w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: *w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-func (sp *StaticPool) checkMaxJobs(w *worker.SyncWorker) error {
+func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
const op = errors.Op("static_pool_check_max_jobs")
- if sp.cfg.MaxJobs != 0 && (*w).State().NumExecs() >= sp.cfg.MaxJobs {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err := sp.ww.AllocateNew()
if err != nil {
return errors.E(op, err)
}
} else {
- sp.ww.PushWorker(*w)
+ sp.ww.PushWorker(w)
}
return nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (*worker.SyncWorkerImpl, error) {
// GetFreeWorker function consumes context with timeout
w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
@@ -230,7 +228,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
// else if err not nil - return error
return nil, errors.E(op, err)
}
- return w.(worker.SyncWorker), nil
+ return w, nil
}
// Destroy all underlying stack (but let them to complete the task).
@@ -239,7 +237,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (payload.Payload, error) {
+ return func(err error, w worker.SyncWorker) (payload.Payload, error) {
const op = errors.Op("error encoder")
// just push event if on any stage was timeout error
if errors.Is(errors.ExecTTL, err) {
@@ -277,8 +275,8 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
}
-func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
- return func() (worker.BaseProcess, error) {
+func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
+ return func() (*worker.SyncWorkerImpl, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
@@ -286,10 +284,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return nil, err
}
- sw, err := syncWorker.From(w)
- if err != nil {
- return nil, err
- }
+ sw := worker.From(w)
sp.events.Push(events.PoolEvent{
Event: events.EventWorkerConstruct,
@@ -305,7 +300,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, err
}
- r, err := sw.(worker.SyncWorker).Exec(p)
+ r, err := sw.Exec(p)
if stopErr := sw.Stop(); stopErr != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
@@ -315,9 +310,9 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.SyncWorker, error) {
const op = errors.Op("allocate workers")
- var workers []worker.BaseProcess
+ var workers []worker.SyncWorker
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
@@ -326,11 +321,7 @@ func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, e
return nil, errors.E(op, errors.WorkerAllocate, err)
}
- sw, err := syncWorker.From(w)
- if err != nil {
- return nil, errors.E(op, err)
- }
- workers = append(workers, sw)
+ workers = append(workers, w)
}
return workers, nil
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 348f5297..a877b28f 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -12,10 +12,10 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/stretchr/testify/assert"
)
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 19cda759..2bae8f9e 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -6,11 +6,10 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/tools"
)
@@ -20,7 +19,7 @@ const MB = 1024 * 1024
const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
type Supervised interface {
- pool.Pool
+ Pool
// Start used to start watching process for all pool workers
Start()
}
@@ -28,12 +27,12 @@ type Supervised interface {
type supervised struct {
cfg *SupervisorConfig
events events.Handler
- pool pool.Pool
+ pool Pool
stopCh chan struct{}
mu *sync.RWMutex
}
-func supervisorWrapper(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
sp := &supervised{
cfg: cfg,
events: events,
@@ -101,13 +100,13 @@ func (sp *supervised) GetConfig() interface{} {
return sp.pool.GetConfig()
}
-func (sp *supervised) Workers() (workers []worker.BaseProcess) {
+func (sp *supervised) Workers() (workers []*worker.SyncWorkerImpl) {
sp.mu.Lock()
defer sp.mu.Unlock()
return sp.pool.Workers()
}
-func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
+func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error {
return sp.pool.RemoveWorker(worker)
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index a9424cd5..58f63b7e 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -7,7 +7,7 @@ import (
"time"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/spiral/roadrunner/v2/tools"
"github.com/stretchr/testify/assert"
)
diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go
new file mode 100644
index 00000000..299ac95f
--- /dev/null
+++ b/pkg/transport/interface.go
@@ -0,0 +1,21 @@
+package transport
+
+import (
+ "context"
+ "os/exec"
+
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+// Factory is responsible of wrapping given command into tasks WorkerProcess.
+type Factory interface {
+ // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
+ // Process must not be started.
+ SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error)
+ // SpawnWorker creates new WorkerProcess process based on given command.
+ // Process must not be started.
+ SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error)
+ // Close the factory and underlying connections.
+ Close() error
+}
diff --git a/pkg/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
index b656eff8..dd7c5841 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/transport/pipe/pipe_factory.go
@@ -6,10 +6,9 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/pipe"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
)
@@ -19,22 +18,22 @@ type Factory struct{}
// NewPipeFactory returns new factory instance and starts
// listening
-func NewPipeFactory() worker.Factory {
+func NewPipeFactory() *Factory {
return &Factory{}
}
type SpawnResult struct {
- w worker.BaseProcess
+ w *worker.Process
err error
}
// SpawnWorker creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
c := make(chan SpawnResult)
const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
c <- SpawnResult{
w: nil,
@@ -113,9 +112,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
const op = errors.Op("factory_spawn_worker")
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
return nil, errors.E(op, err)
}
diff --git a/pkg/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index 805a24ee..2e5bbcd5 100644
--- a/pkg/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -8,15 +8,15 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
func Test_GetState2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -28,14 +28,11 @@ func Test_GetState2(t *testing.T) {
assert.NotNil(t, w)
assert.Equal(t, internal.StateReady, w.State().Value())
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
+ assert.NoError(t, w.Stop())
}
func Test_Kill2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
wg := &sync.WaitGroup{}
@@ -58,7 +55,7 @@ func Test_Kill2(t *testing.T) {
}
func Test_Pipe_Start2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.NoError(t, err)
@@ -72,7 +69,7 @@ func Test_Pipe_Start2(t *testing.T) {
}
func Test_Pipe_StartError2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
err := cmd.Start()
if err != nil {
t.Errorf("error running the command: error %v", err)
@@ -84,7 +81,7 @@ func Test_Pipe_StartError2(t *testing.T) {
}
func Test_Pipe_PipeError3(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
_, err := cmd.StdinPipe()
if err != nil {
t.Errorf("error creating the STDIN pipe: error %v", err)
@@ -96,7 +93,7 @@ func Test_Pipe_PipeError3(t *testing.T) {
}
func Test_Pipe_PipeError4(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
_, err := cmd.StdinPipe()
if err != nil {
t.Errorf("error creating the STDIN pipe: error %v", err)
@@ -108,7 +105,7 @@ func Test_Pipe_PipeError4(t *testing.T) {
}
func Test_Pipe_Failboot2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Nil(t, w)
@@ -117,14 +114,14 @@ func Test_Pipe_Failboot2(t *testing.T) {
}
func Test_Pipe_Invalid2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
func Test_Pipe_Echo2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
@@ -136,10 +133,7 @@ func Test_Pipe_Echo2(t *testing.T) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -152,7 +146,7 @@ func Test_Pipe_Echo2(t *testing.T) {
}
func Test_Pipe_Broken2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
@@ -163,10 +157,7 @@ func Test_Pipe_Broken2(t *testing.T) {
assert.Error(t, err)
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -178,7 +169,7 @@ func Test_Pipe_Broken2(t *testing.T) {
func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
f := NewPipeFactory()
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := f.SpawnWorker(cmd)
go func() {
if w.Wait() != nil {
@@ -194,13 +185,11 @@ func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
+
b.ReportAllocs()
b.ResetTimer()
go func() {
@@ -224,7 +213,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
b.Fatal(err)
@@ -237,10 +226,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -250,7 +236,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
b.Fatal(err)
@@ -263,10 +249,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -276,28 +259,26 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
}
func Test_Echo2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
+
go func() {
- assert.NoError(t, syncWorker.Wait())
+ assert.NoError(t, sw.Wait())
}()
defer func() {
- err := syncWorker.Stop()
+ err := sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
}()
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -308,26 +289,23 @@ func Test_Echo2(t *testing.T) {
}
func Test_BadPayload2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
go func() {
- assert.NoError(t, syncWorker.Wait())
+ assert.NoError(t, sw.Wait())
}()
defer func() {
- err := syncWorker.Stop()
+ err := sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
}()
- res, err := syncWorker.Exec(payload.Payload{})
+ res, err := sw.Exec(payload.Payload{})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -337,7 +315,7 @@ func Test_BadPayload2(t *testing.T) {
}
func Test_String2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -350,13 +328,13 @@ func Test_String2(t *testing.T) {
}
}()
- assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes")
assert.Contains(t, w.String(), "ready")
assert.Contains(t, w.String(), "numExecs: 0")
}
func Test_Echo_Slow2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -369,12 +347,9 @@ func Test_Echo_Slow2(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -385,7 +360,7 @@ func Test_Echo_Slow2(t *testing.T) {
}
func Test_Broken2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
data := ""
mu := &sync.Mutex{}
listener := func(event interface{}) {
@@ -401,12 +376,9 @@ func Test_Broken2(t *testing.T) {
t.Fatal(err)
}
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -421,7 +393,7 @@ func Test_Broken2(t *testing.T) {
}
func Test_Error2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -435,12 +407,9 @@ func Test_Error2(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -452,7 +421,7 @@ func Test_Error2(t *testing.T) {
}
func Test_NumExecs2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
@@ -465,24 +434,21 @@ func Test_NumExecs2(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(1), w.State().NumExecs())
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(2), w.State().NumExecs())
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index a2731294..fa37ac0f 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -9,16 +9,16 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
func Test_GetState(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -38,7 +38,7 @@ func Test_GetState(t *testing.T) {
func Test_Kill(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
wg := &sync.WaitGroup{}
@@ -62,7 +62,7 @@ func Test_Kill(t *testing.T) {
func Test_Pipe_Start(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
@@ -76,7 +76,7 @@ func Test_Pipe_Start(t *testing.T) {
}
func Test_Pipe_StartError(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
err := cmd.Start()
if err != nil {
t.Errorf("error running the command: error %v", err)
@@ -89,7 +89,7 @@ func Test_Pipe_StartError(t *testing.T) {
}
func Test_Pipe_PipeError(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
_, err := cmd.StdinPipe()
if err != nil {
t.Errorf("error creating the STDIN pipe: error %v", err)
@@ -102,7 +102,7 @@ func Test_Pipe_PipeError(t *testing.T) {
}
func Test_Pipe_PipeError2(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
_, err := cmd.StdinPipe()
if err != nil {
t.Errorf("error creating the STDIN pipe: error %v", err)
@@ -115,7 +115,7 @@ func Test_Pipe_PipeError2(t *testing.T) {
}
func Test_Pipe_Failboot(t *testing.T) {
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
@@ -125,7 +125,7 @@ func Test_Pipe_Failboot(t *testing.T) {
}
func Test_Pipe_Invalid(t *testing.T) {
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
@@ -133,7 +133,7 @@ func Test_Pipe_Invalid(t *testing.T) {
}
func Test_Pipe_Echo(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -146,10 +146,7 @@ func Test_Pipe_Echo(t *testing.T) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -162,7 +159,7 @@ func Test_Pipe_Echo(t *testing.T) {
}
func Test_Pipe_Broken(t *testing.T) {
- cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -174,10 +171,7 @@ func Test_Pipe_Broken(t *testing.T) {
assert.Error(t, err)
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -189,7 +183,7 @@ func Test_Pipe_Broken(t *testing.T) {
func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
f := NewPipeFactory()
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd)
go func() {
if w.Wait() != nil {
@@ -205,13 +199,11 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd)
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
+
b.ReportAllocs()
b.ResetTimer()
go func() {
@@ -235,7 +227,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -249,10 +241,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -262,7 +251,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
}
func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -276,10 +265,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
}
}()
- sw, err := workerImpl.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -290,28 +276,25 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
func Test_Echo(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
go func() {
- assert.NoError(t, syncWorker.Wait())
+ assert.NoError(t, sw.Wait())
}()
defer func() {
- err := syncWorker.Stop()
+ err := sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
}()
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -323,26 +306,23 @@ func Test_Echo(t *testing.T) {
func Test_BadPayload(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
go func() {
- assert.NoError(t, syncWorker.Wait())
+ assert.NoError(t, sw.Wait())
}()
defer func() {
- err := syncWorker.Stop()
+ err := sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
}()
- res, err := syncWorker.Exec(payload.Payload{})
+ res, err := sw.Exec(payload.Payload{})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -353,7 +333,7 @@ func Test_BadPayload(t *testing.T) {
func Test_String(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -366,14 +346,14 @@ func Test_String(t *testing.T) {
}
}()
- assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes")
assert.Contains(t, w.String(), "ready")
assert.Contains(t, w.String(), "numExecs: 0")
}
func Test_Echo_Slow(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -386,12 +366,9 @@ func Test_Echo_Slow(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -403,7 +380,7 @@ func Test_Echo_Slow(t *testing.T) {
func Test_Broken(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
data := ""
mu := &sync.Mutex{}
listener := func(event interface{}) {
@@ -419,12 +396,9 @@ func Test_Broken(t *testing.T) {
t.Fatal(err)
}
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -440,7 +414,7 @@ func Test_Broken(t *testing.T) {
func Test_Error(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -454,12 +428,9 @@ func Test_Error(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -472,7 +443,7 @@ func Test_Error(t *testing.T) {
func Test_NumExecs(t *testing.T) {
ctx := context.Background()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -485,24 +456,21 @@ func Test_NumExecs(t *testing.T) {
}
}()
- syncWorker, err := workerImpl.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(1), w.State().NumExecs())
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(2), w.State().NumExecs())
- _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index 8f99ff73..ccd2b0bf 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -11,10 +11,9 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/goridge/v3/pkg/socket"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
@@ -38,7 +37,7 @@ type Factory struct {
// NewSocketServer returns Factory attached to a given socket listener.
// tout specifies for how long factory should serve for incoming relay connection
-func NewSocketServer(ls net.Listener, tout time.Duration) worker.Factory {
+func NewSocketServer(ls net.Listener, tout time.Duration) *Factory {
f := &Factory{
ls: ls,
tout: tout,
@@ -80,18 +79,18 @@ func (f *Factory) listen() error {
}
type socketSpawn struct {
- w worker.BaseProcess
+ w *worker.Process
err error
}
// SpawnWorker creates Process and connects it to appropriate relay or returns error
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
const op = errors.Op("factory_spawn_worker_with_timeout")
c := make(chan socketSpawn)
go func() {
ctx, cancel := context.WithTimeout(ctx, f.tout)
defer cancel()
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
c <- socketSpawn{
w: nil,
@@ -145,9 +144,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
const op = errors.Op("factory_spawn_worker")
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
return nil, err
}
diff --git a/pkg/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index 2f21e408..0e29e7d2 100644
--- a/pkg/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -26,7 +26,7 @@ func Test_Tcp_Start2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
assert.NoError(t, err)
@@ -49,7 +49,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
f := NewSocketServer(ls, time.Minute)
defer func() {
@@ -82,7 +82,7 @@ func Test_Tcp_StartError2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
err = cmd.Start()
if err != nil {
t.Errorf("error executing the command: error %v", err)
@@ -106,7 +106,7 @@ func Test_Tcp_Failboot2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
@@ -127,7 +127,7 @@ func Test_Tcp_Invalid2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd)
assert.Error(t, err)
@@ -147,7 +147,7 @@ func Test_Tcp_Broken2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
@@ -169,10 +169,7 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Error(t, err2)
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
@@ -194,7 +191,7 @@ func Test_Tcp_Echo2(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
go func() {
@@ -207,10 +204,7 @@ func Test_Tcp_Echo2(t *testing.T) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -230,7 +224,7 @@ func Test_Unix_Start2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
assert.NoError(t, err)
@@ -254,7 +248,7 @@ func Test_Unix_Failboot2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
@@ -270,7 +264,7 @@ func Test_Unix_Timeout2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0")
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0")
w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd)
assert.Nil(t, w)
@@ -286,7 +280,7 @@ func Test_Unix_Invalid2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd)
assert.Error(t, err)
@@ -301,7 +295,7 @@ func Test_Unix_Broken2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
@@ -322,10 +316,7 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Error(t, err)
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -343,7 +334,7 @@ func Test_Unix_Echo2(t *testing.T) {
assert.NoError(t, err)
}()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
@@ -359,10 +350,7 @@ func Test_Unix_Echo2(t *testing.T) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -384,7 +372,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) {
f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := f.SpawnWorker(cmd)
if err != nil {
@@ -409,7 +397,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
assert.NoError(b, err)
}()
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
@@ -422,10 +410,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -452,7 +437,7 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) {
f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := f.SpawnWorker(cmd)
if err != nil {
@@ -481,7 +466,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
b.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
@@ -494,10 +479,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
diff --git a/pkg/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index 983f3e8e..f55fc3dd 100755
--- a/pkg/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -29,7 +29,7 @@ func Test_Tcp_Start(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
@@ -54,7 +54,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
f := NewSocketServer(ls, time.Minute)
defer func() {
@@ -89,7 +89,7 @@ func Test_Tcp_StartError(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
err = cmd.Start()
if err != nil {
t.Errorf("error executing the command: error %v", err)
@@ -116,7 +116,7 @@ func Test_Tcp_Failboot(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
@@ -139,7 +139,7 @@ func Test_Tcp_Timeout(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0")
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "tcp", "200", "0")
w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
@@ -162,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
@@ -184,7 +184,7 @@ func Test_Tcp_Broken(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -206,10 +206,7 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, err2)
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
@@ -233,7 +230,7 @@ func Test_Tcp_Echo(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
@@ -246,10 +243,7 @@ func Test_Tcp_Echo(t *testing.T) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -275,7 +269,7 @@ func Test_Unix_Start(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
@@ -305,7 +299,7 @@ func Test_Unix_Failboot(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/failboot.php")
+ cmd := exec.Command("php", "../../../tests/failboot.php")
w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
@@ -327,7 +321,7 @@ func Test_Unix_Timeout(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0")
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0")
w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
@@ -349,7 +343,7 @@ func Test_Unix_Invalid(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/invalid.php")
+ cmd := exec.Command("php", "../../../tests/invalid.php")
w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
@@ -370,7 +364,7 @@ func Test_Unix_Broken(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -391,10 +385,7 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -418,7 +409,7 @@ func Test_Unix_Echo(t *testing.T) {
t.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -434,10 +425,7 @@ func Test_Unix_Echo(t *testing.T) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := worker.From(w)
res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
@@ -465,7 +453,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -496,7 +484,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
b.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -509,10 +497,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
@@ -537,7 +522,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -564,7 +549,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
b.Skip("socket is busy")
}
- cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
@@ -577,10 +562,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
}
}()
- sw, err := worker.From(w)
- if err != nil {
- b.Fatal(err)
- }
+ sw := worker.From(w)
for n := 0; n < b.N; n++ {
if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
new file mode 100644
index 00000000..9d74ae10
--- /dev/null
+++ b/pkg/worker/interface.go
@@ -0,0 +1,56 @@
+package worker
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+)
+
+type BaseProcess interface {
+ fmt.Stringer
+
+ // Pid returns worker pid.
+ Pid() int64
+
+ // Created returns time worker was created at.
+ Created() time.Time
+
+ // State return receive-only WorkerProcess state object, state can be used to safely access
+ // WorkerProcess status, time when status changed and number of WorkerProcess executions.
+ State() internal.State
+
+ // Start used to run Cmd and immediately return
+ Start() error
+
+ // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
+ // complete and will return process error (if any), if stderr is presented it's value
+ // will be wrapped as WorkerError. Method will return error code if php process fails
+ // to find or Start the script.
+ Wait() error
+
+ // Stop sends soft termination command to the WorkerProcess and waits for process completion.
+ Stop() error
+
+ // Kill kills underlying process, make sure to call Wait() func to gather
+ // error log from the stderr. Does not waits for process completion!
+ Kill() error
+
+ // Relay returns attached to worker goridge relay
+ Relay() relay.Relay
+
+ // AttachRelay used to attach goridge relay to the worker process
+ AttachRelay(rl relay.Relay)
+}
+
+type SyncWorker interface {
+ // BaseProcess provides basic functionality for the SyncWorker
+ BaseProcess
+ // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
+ Exec(rqs payload.Payload) (payload.Payload, error)
+ // ExecWithContext used to handle Exec with TTL
+ ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error)
+}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 8314c039..1a0393fb 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -8,50 +8,67 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
"go.uber.org/multierr"
)
-type syncWorker struct {
- w worker.BaseProcess
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (*SyncWorkerImpl, error)
+
+type SyncWorkerImpl struct {
+ process *Process
}
// From creates SyncWorker from BaseProcess
-func From(w worker.BaseProcess) (worker.SyncWorker, error) {
- return &syncWorker{
- w: w,
- }, nil
+func From(process *Process) *SyncWorkerImpl {
+ return &SyncWorkerImpl{
+ process: process,
+ }
+}
+
+// FromSync creates BaseProcess from SyncWorkerImpl
+func FromSync(w *SyncWorkerImpl) BaseProcess {
+ return &Process{
+ created: w.process.created,
+ events: w.process.events,
+ state: w.process.state,
+ cmd: w.process.cmd,
+ pid: w.process.pid,
+ stderr: w.process.stderr,
+ endState: w.process.endState,
+ relay: w.process.relay,
+ rd: w.process.rd,
+ }
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.w.State().Value() != internal.StateReady {
- return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
+ if tw.process.State().Value() != internal.StateReady {
+ return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(internal.StateWorking)
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(internal.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.w.State().Set(internal.StateErrored)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateErrored)
+ tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.w.State().Set(internal.StateReady)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateReady)
+ tw.process.State().RegisterExec()
return rsp, nil
}
@@ -62,7 +79,7 @@ type wexec struct {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
@@ -75,24 +92,24 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
return
}
- if tw.w.State().Value() != internal.StateReady {
+ if tw.process.State().Value() != internal.StateReady {
c <- wexec{
payload: payload.Payload{},
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())),
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
}
return
}
// set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(internal.StateWorking)
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(internal.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.w.State().Set(internal.StateErrored)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateErrored)
+ tw.process.State().RegisterExec()
}
c <- wexec{
payload: payload.Payload{},
@@ -101,8 +118,8 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
return
}
- tw.w.State().Set(internal.StateReady)
- tw.w.State().RegisterExec()
+ tw.process.State().Set(internal.StateReady)
+ tw.process.State().RegisterExec()
c <- wexec{
payload: rsp,
@@ -128,7 +145,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
}
}
-func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
fr := frame.NewFrame()
@@ -156,7 +173,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
frameR := frame.NewFrame()
- err = tw.w.Relay().Receive(frameR)
+ err = tw.process.Relay().Receive(frameR)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -186,42 +203,42 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
return pl, nil
}
-func (tw *syncWorker) String() string {
- return tw.w.String()
+func (tw *SyncWorkerImpl) String() string {
+ return tw.process.String()
}
-func (tw *syncWorker) Pid() int64 {
- return tw.w.Pid()
+func (tw *SyncWorkerImpl) Pid() int64 {
+ return tw.process.Pid()
}
-func (tw *syncWorker) Created() time.Time {
- return tw.w.Created()
+func (tw *SyncWorkerImpl) Created() time.Time {
+ return tw.process.Created()
}
-func (tw *syncWorker) State() internal.State {
- return tw.w.State()
+func (tw *SyncWorkerImpl) State() internal.State {
+ return tw.process.State()
}
-func (tw *syncWorker) Start() error {
- return tw.w.Start()
+func (tw *SyncWorkerImpl) Start() error {
+ return tw.process.Start()
}
-func (tw *syncWorker) Wait() error {
- return tw.w.Wait()
+func (tw *SyncWorkerImpl) Wait() error {
+ return tw.process.Wait()
}
-func (tw *syncWorker) Stop() error {
- return tw.w.Stop()
+func (tw *SyncWorkerImpl) Stop() error {
+ return tw.process.Stop()
}
-func (tw *syncWorker) Kill() error {
- return tw.w.Kill()
+func (tw *SyncWorkerImpl) Kill() error {
+ return tw.process.Kill()
}
-func (tw *syncWorker) Relay() relay.Relay {
- return tw.w.Relay()
+func (tw *SyncWorkerImpl) Relay() relay.Relay {
+ return tw.process.Relay()
}
-func (tw *syncWorker) AttachRelay(rl relay.Relay) {
- tw.w.AttachRelay(rl)
+func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
+ tw.process.AttachRelay(rl)
}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
index 40988b06..df556e93 100755
--- a/pkg/worker/sync_worker_test.go
+++ b/pkg/worker/sync_worker_test.go
@@ -22,12 +22,9 @@ func Test_NotStarted_Exec(t *testing.T) {
w, _ := InitBaseWorker(cmd)
- syncWorker, err := From(w)
- if err != nil {
- t.Fatal(err)
- }
+ sw := From(w)
- res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index bf70d646..8fd71cca 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -13,10 +13,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/interfaces/relay"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"go.uber.org/multierr"
)
@@ -78,14 +76,14 @@ type Process struct {
}
// InitBaseWorker creates new Process over given exec.cmd.
-func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, error) {
+func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
const op = errors.Op("init_base_worker")
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
w := &Process{
created: time.Now(),
- events: eventsPkg.NewEventsHandler(),
+ events: events.NewEventsHandler(),
cmd: cmd,
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),
@@ -198,7 +196,7 @@ func (w *Process) Wait() error {
// at this point according to the documentation (see cmd.Wait comment)
// if worker finishes with an error, message will be written to the stderr first
- // and then w.cmd.Wait return an error
+ // and then process.cmd.Wait return an error
w.endState = w.cmd.ProcessState
if err != nil {
w.state.Set(internal.StateErrored)
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
new file mode 100644
index 00000000..13991541
--- /dev/null
+++ b/pkg/worker_watcher/interface.go
@@ -0,0 +1,30 @@
+package worker_watcher //nolint:golint,stylecheck
+
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Watcher interface {
+ // AddToWatch used to add stack to wait its state
+ AddToWatch(workers []worker.SyncWorker) error
+
+ // GetFreeWorker provide first free worker
+ GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error)
+
+ // PutWorker enqueues worker back
+ PushWorker(w worker.SyncWorker)
+
+ // AllocateNew used to allocate new worker and put in into the WorkerWatcher
+ AllocateNew() error
+
+ // Destroy destroys the underlying stack
+ Destroy(ctx context.Context)
+
+ // WorkersList return all stack w/o removing it from internal storage
+ WorkersList() []*worker.SyncWorkerImpl
+
+ // RemoveWorker remove worker from the stack
+ RemoveWorker(wb worker.SyncWorker) error
+}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index c87e8b65..2d23d0e9 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -5,12 +5,12 @@ import (
"sync"
"time"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
- workers []worker.BaseProcess
+ workers []*worker.SyncWorkerImpl
mutex sync.RWMutex
destroy bool
actualNumOfWorkers uint64
@@ -20,7 +20,7 @@ type Stack struct {
func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
w := runtime.NumCPU()
return &Stack{
- workers: make([]worker.BaseProcess, 0, w),
+ workers: make([]*worker.SyncWorkerImpl, 0, w),
actualNumOfWorkers: 0,
initialNumOfWorkers: initialNumOfWorkers,
}
@@ -39,7 +39,7 @@ func (stack *Stack) Push(w worker.BaseProcess) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
stack.actualNumOfWorkers++
- stack.workers = append(stack.workers, w)
+ stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl))
}
func (stack *Stack) IsEmpty() bool {
@@ -48,7 +48,7 @@ func (stack *Stack) IsEmpty() bool {
return len(stack.workers) == 0
}
-func (stack *Stack) Pop() (worker.BaseProcess, bool) {
+func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
@@ -85,13 +85,15 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
}
// Workers return copy of the workers in the stack
-func (stack *Stack) Workers() []worker.BaseProcess {
+func (stack *Stack) Workers() []*worker.SyncWorkerImpl {
stack.mutex.Lock()
defer stack.mutex.Unlock()
- workersCopy := make([]worker.BaseProcess, 0, 1)
+ workersCopy := make([]*worker.SyncWorkerImpl, 0, 1)
// copy
for _, v := range stack.workers {
- workersCopy = append(workersCopy, v)
+ if v != nil {
+ workersCopy = append(workersCopy, v)
+ }
}
return workersCopy
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go
index 86af2043..5287a6dc 100644
--- a/pkg/worker_watcher/stack_test.go
+++ b/pkg/worker_watcher/stack_test.go
@@ -5,24 +5,25 @@ import (
"testing"
"time"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
func TestNewWorkersStack(t *testing.T) {
stack := NewWorkersStack(0)
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
- assert.Equal(t, []worker.BaseProcess{}, stack.workers)
+ assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers)
}
func TestStack_Push(t *testing.T) {
stack := NewWorkersStack(1)
- w, err := workerImpl.InitBaseWorker(&exec.Cmd{})
+ w, err := worker.InitBaseWorker(&exec.Cmd{})
assert.NoError(t, err)
- stack.Push(w)
+ sw := worker.From(w)
+
+ stack.Push(sw)
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
}
@@ -30,10 +31,12 @@ func TestStack_Pop(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
- stack.Push(w)
+ sw := worker.From(w)
+
+ stack.Push(sw)
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
_, _ = stack.Pop()
@@ -43,12 +46,14 @@ func TestStack_Pop(t *testing.T) {
func TestStack_FindAndRemoveByPid(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+
+ stack.Push(sw)
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
stack.FindAndRemoveByPid(w.Pid())
@@ -59,10 +64,12 @@ func TestStack_IsEmpty(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
assert.Equal(t, false, stack.IsEmpty())
@@ -71,11 +78,12 @@ func TestStack_IsEmpty(t *testing.T) {
func TestStack_Workers(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
wrks := stack.Workers()
assert.Equal(t, 1, len(wrks))
@@ -85,11 +93,13 @@ func TestStack_Workers(t *testing.T) {
func TestStack_Reset(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
stack.Reset()
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
@@ -98,11 +108,13 @@ func TestStack_Reset(t *testing.T) {
func TestStack_Destroy(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+
stack.Destroy(context.Background())
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
}
@@ -110,12 +122,13 @@ func TestStack_Destroy(t *testing.T) {
func TestStack_DestroyWithWait(t *testing.T) {
stack := NewWorkersStack(2)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+ stack.Push(sw)
assert.Equal(t, uint64(2), stack.actualNumOfWorkers)
go func() {
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index b0d39165..f87bd021 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -5,13 +5,13 @@ import (
"sync"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
)
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher {
ww := &workerWatcher{
stack: NewWorkersStack(uint64(numWorkers)),
allocator: allocator,
@@ -28,18 +28,18 @@ type workerWatcher struct {
events events.Handler
}
-func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
+func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
for i := 0; i < len(workers); i++ {
ww.stack.Push(workers[i])
- go func(swc worker.BaseProcess) {
+ go func(swc worker.SyncWorker) {
ww.wait(swc)
}(workers[i])
}
return nil
}
-func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) {
+func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
w, stop := ww.stack.Pop()
@@ -94,7 +94,7 @@ func (ww *workerWatcher) AllocateNew() error {
return nil
}
-func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
+func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
@@ -114,7 +114,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
}
// O(1) operation
-func (ww *workerWatcher) PushWorker(w worker.BaseProcess) {
+func (ww *workerWatcher) PushWorker(w worker.SyncWorker) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
ww.stack.Push(w)
@@ -127,11 +127,11 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) WorkersList() []worker.BaseProcess {
+func (ww *workerWatcher) WorkersList() []*worker.SyncWorkerImpl {
return ww.stack.Workers()
}
-func (ww *workerWatcher) wait(w worker.BaseProcess) {
+func (ww *workerWatcher) wait(w worker.SyncWorker) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
@@ -158,7 +158,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
}
}
-func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
+func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) {
go func() {
ww.wait(wb)
}()