diff options
Diffstat (limited to 'pkg')
34 files changed, 986 insertions, 433 deletions
diff --git a/pkg/events/general.go b/pkg/events/general.go index a09a8759..5cf13e10 100755 --- a/pkg/events/general.go +++ b/pkg/events/general.go @@ -4,6 +4,8 @@ import ( "sync" ) +const UnknownEventType string = "Unknown event type" + // HandlerImpl helps to broadcast events to multiple listeners. type HandlerImpl struct { listeners []Listener diff --git a/pkg/events/interface.go b/pkg/events/interface.go index ac6c15a4..7d57e4d0 100644 --- a/pkg/events/interface.go +++ b/pkg/events/interface.go @@ -2,7 +2,7 @@ package events // Handler interface type Handler interface { - // Return number of active listeners + // NumListeners return number of active listeners NumListeners() int // AddListener adds lister to the publisher AddListener(listener Listener) @@ -10,5 +10,5 @@ type Handler interface { Push(e interface{}) } -// Event listener listens for the events produced by worker, worker pool or other service. +// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service. type Listener func(event interface{}) diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go new file mode 100644 index 00000000..c0ee733a --- /dev/null +++ b/pkg/events/jobs_events.go @@ -0,0 +1,88 @@ +package events + +import ( + "time" +) + +const ( + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK = iota + 12000 + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventInitialized when pipeline has been initialized, but not started + EventInitialized + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipePaused when pipeline has been paused. + EventPipePaused + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventDriverReady thrown when broken is ready to accept/serve tasks. + EventDriverReady +) + +type J int64 + +func (ev J) String() string { + switch ev { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventInitialized: + return "EventInitialized" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventDriverReady: + return "EventDriverReady" + } + return UnknownEventType +} + +// JobEvent represent job event. +type JobEvent struct { + Event J + // String is job id. + ID string + + // Pipeline name + Pipeline string + + // Associated driver name (amqp, ephemeral, etc) + Driver string + + // Error for the jobs/pipes errors + Error error + + // event timings + Start time.Time + Elapsed time.Duration +} diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go index e7b451e0..4d4cae5d 100644 --- a/pkg/events/pool_events.go +++ b/pkg/events/pool_events.go @@ -57,7 +57,7 @@ func (ev P) String() string { case EventPoolRestart: return "EventPoolRestart" } - return "Unknown event type" + return UnknownEventType } // PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go index 11bd6ab7..39c38e57 100644 --- a/pkg/events/worker_events.go +++ b/pkg/events/worker_events.go @@ -20,7 +20,7 @@ func (ev W) String() string { case EventWorkerStderr: return "EventWorkerStderr" } - return "Unknown event type" + return UnknownEventType } // WorkerEvent wraps worker events. diff --git a/pkg/pool/config.go b/pkg/pool/config.go index 2a3dabe4..3a058956 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -5,7 +5,7 @@ import ( "time" ) -// Configures the pool behavior. +// Config .. Pool config Configures the pool behavior. type Config struct { // Debug flag creates new fresh worker before every request. Debug bool diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index bbf7653e..4049122c 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -13,7 +13,7 @@ type Pool interface { GetConfig() interface{} // Exec executes task with payload - Exec(rqs payload.Payload) (payload.Payload, error) + Exec(rqs *payload.Payload) (*payload.Payload, error) // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) @@ -25,7 +25,7 @@ type Pool interface { Destroy(ctx context.Context) // ExecWithContext executes task with context which is used with timeout - execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) + execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error) } // Watcher is an interface for the Sync workers lifecycle @@ -33,11 +33,11 @@ type Watcher interface { // Watch used to add workers to the container Watch(workers []worker.BaseProcess) error - // Get provide first free worker - Get(ctx context.Context) (worker.BaseProcess, error) + // Take takes the first free worker + Take(ctx context.Context) (worker.BaseProcess, error) - // Push enqueues worker back - Push(w worker.BaseProcess) + // Release releases the worker putting it back to the queue + Release(w worker.BaseProcess) // Allocate - allocates new worker and put it into the WorkerWatcher Allocate() error diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 5a6247b5..051e7a8a 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -18,7 +18,7 @@ import ( const StopRequest = "{\"stop\":true}" // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) type Options func(p *StaticPool) @@ -26,7 +26,7 @@ type Command func() *exec.Cmd // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - cfg Config + cfg *Config // worker command creator cmd Command @@ -51,7 +51,7 @@ type StaticPool struct { } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) { +func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { const op = errors.Op("static_pool_initialize") if factory == nil { return nil, errors.E(op, errors.Str("no factory initialized")) @@ -135,16 +135,16 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { } // Exec executes provided payload on the worker -func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec") if sp.cfg.Debug { return sp.execDebug(p) } ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() - w, err := sp.getWorker(ctxGetFree, op) + w, err := sp.takeWorker(ctxGetFree, op) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } rsp, err := w.(worker.SyncWorker).Exec(p) @@ -163,12 +163,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { return rsp, nil } // return worker back - sp.ww.Push(w) + sp.ww.Release(w) return rsp, nil } // Be careful, sync with pool.Exec method -func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") if sp.cfg.Debug { return sp.execDebugWithTTL(ctx, p) @@ -176,9 +176,9 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() - w, err := sp.getWorker(ctxAlloc, op) + w, err := sp.takeWorker(ctxAlloc, op) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) @@ -198,7 +198,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo } // return worker back - sp.ww.Push(w) + sp.ww.Release(w) return rsp, nil } @@ -216,16 +216,16 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) { if w.State().NumExecs() >= sp.cfg.MaxJobs { w.State().Set(worker.StateMaxJobsReached) - sp.ww.Push(w) + sp.ww.Release(w) return } - sp.ww.Push(w) + sp.ww.Release(w) } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { +func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { // Get function consumes context with timeout - w, err := sp.ww.Get(ctxGetFree) + w, err := sp.ww.Take(ctxGetFree) if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { @@ -244,7 +244,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (payload.Payload, error) { + return func(err error, w worker.BaseProcess) (*payload.Payload, error) { const op = errors.Op("error_encoder") // just push event if on any stage was timeout error switch { @@ -253,6 +253,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.SoftJob, err): if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // TODO suspicious logic, redesign err = sp.ww.Allocate() if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) @@ -265,7 +266,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } } else { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) - sp.ww.Push(w) + sp.ww.Release(w) } } @@ -273,10 +274,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop() if errS != nil { - return payload.Payload{}, errors.E(op, err, errS) + return nil, errors.E(op, err, errS) } - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } } @@ -289,6 +290,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return nil, err } + // wrap sync worker sw := worker.From(w) sp.events.Push(events.PoolEvent{ @@ -300,26 +302,33 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio } // execDebug used when debug mode was not set and exec_ttl is 0 -func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("static_pool_exec_debug") sw, err := sp.allocator() if err != nil { - return payload.Payload{}, err + return nil, err } - // redirect call to the workers exec method (without ttl) + // redirect call to the workers' exec method (without ttl) r, err := sw.Exec(p) - if stopErr := sw.Stop(); stopErr != nil { + if err != nil { + return nil, errors.E(op, err) + } + + err = sw.Stop() + if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + return nil, errors.E(op, err) } - return r, err + return r, nil } // execDebugWithTTL used when user set debug mode and exec_ttl -func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return payload.Payload{}, err + return nil, err } // redirect call to the worker with TTL @@ -333,7 +342,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) ( // allocate required number of stack func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { - const op = errors.Op("allocate workers") + const op = errors.Op("static_pool_allocate_workers") workers := make([]worker.BaseProcess, 0, numWorkers) // constant number of stack simplify logic diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 6f875072..2ac2093d 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -20,7 +20,7 @@ import ( "github.com/stretchr/testify/assert" ) -var cfg = Config{ +var cfg = &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, @@ -58,7 +58,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -82,7 +82,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -106,7 +106,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -130,7 +130,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -151,11 +151,10 @@ func Test_StaticPool_JobError(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.Exec") @@ -192,10 +191,9 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block @@ -204,7 +202,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() - // Consume pool events + // Run pool events ev := make(chan struct{}, 1) listener := func(event interface{}) { if pe, ok := event.(events.PoolEvent); ok { @@ -214,7 +212,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } } - var cfg2 = Config{ + var cfg2 = &Config{ NumWorkers: 1, AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, @@ -232,7 +230,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -264,7 +262,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, @@ -283,7 +281,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, @@ -298,11 +296,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -320,7 +318,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ Debug: true, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -334,14 +332,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.Len(t, p.Workers(), 0) var lastPID string - res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotEqual(t, lastPID, string(res.Body)) assert.Len(t, p.Workers(), 0) for i := 0; i < 10; i++ { assert.Len(t, p.Workers(), 0) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -360,7 +358,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -374,14 +372,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -400,7 +398,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -411,7 +409,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.Exec(payload.Payload{Body: []byte("100")}) + _, err = p.Exec(&payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -422,7 +420,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -433,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, errP := p.Exec(payload.Payload{Body: []byte("100")}) + _, errP := p.Exec(&payload.Payload{Body: []byte("100")}) if errP != nil { t.Errorf("error executing payload: error %v", err) } @@ -441,7 +439,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 100) p.Destroy(ctx) - _, err = p.Exec(payload.Payload{Body: []byte("100")}) + _, err = p.Exec(&payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -452,7 +450,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, @@ -465,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { p.Workers()[i].State().Set(worker.StateErrored) } - _, err = p.Exec(payload.Payload{Body: []byte("hello")}) + _, err = p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) p.Destroy(ctx) } @@ -476,7 +474,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -506,7 +504,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { // sleep for the 3 seconds func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ Debug: false, NumWorkers: 1, AllocateTimeout: time.Second, @@ -519,14 +517,13 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.NotNil(t, p) go func() { - _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) }() time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block @@ -539,7 +536,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -556,7 +553,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -567,6 +564,24 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { assert.Nil(t, p) } +/* PTR: +Benchmark_Pool_Echo-32 49076 29926 ns/op 8016 B/op 20 allocs/op +Benchmark_Pool_Echo-32 47257 30779 ns/op 8047 B/op 20 allocs/op +Benchmark_Pool_Echo-32 46737 29440 ns/op 8065 B/op 20 allocs/op +Benchmark_Pool_Echo-32 51177 29074 ns/op 7981 B/op 20 allocs/op +Benchmark_Pool_Echo-32 51764 28319 ns/op 8012 B/op 20 allocs/op +Benchmark_Pool_Echo-32 54054 30714 ns/op 7987 B/op 20 allocs/op +Benchmark_Pool_Echo-32 54391 30689 ns/op 8055 B/op 20 allocs/op + +VAL: +Benchmark_Pool_Echo-32 47936 28679 ns/op 7942 B/op 19 allocs/op +Benchmark_Pool_Echo-32 49010 29830 ns/op 7970 B/op 19 allocs/op +Benchmark_Pool_Echo-32 46771 29031 ns/op 8014 B/op 19 allocs/op +Benchmark_Pool_Echo-32 47760 30517 ns/op 7955 B/op 19 allocs/op +Benchmark_Pool_Echo-32 48148 29816 ns/op 7950 B/op 19 allocs/op +Benchmark_Pool_Echo-32 52705 29809 ns/op 7979 B/op 19 allocs/op +Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allocs/op +*/ func Benchmark_Pool_Echo(b *testing.B) { ctx := context.Background() p, err := Initialize( @@ -579,23 +594,33 @@ func Benchmark_Pool_Echo(b *testing.B) { b.Fatal(err) } + bd := make([]byte, 1024) + c := make([]byte, 1024) + + pld := &payload.Payload{ + Context: c, + Body: bd, + } + b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(pld); err != nil { b.Fail() } } } // Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op +// PTR -> Benchmark_Pool_Echo_Batched-32 406839 2900 ns/op 1059 B/op 23 allocs/op +// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx := context.Background() p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, @@ -604,12 +629,23 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { assert.NoError(b, err) defer p.Destroy(ctx) + bd := make([]byte, 1024) + c := make([]byte, 1024) + + pld := &payload.Payload{ + Context: c, + Body: bd, + } + + b.ResetTimer() + b.ReportAllocs() + var wg sync.WaitGroup for i := 0; i < b.N; i++ { wg.Add(1) go func() { defer wg.Done() - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(pld); err != nil { b.Fail() log.Println(err) } @@ -626,7 +662,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, @@ -639,7 +675,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 4b990dbe..bdaeade1 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -43,11 +43,11 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) return sp } -func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) { +func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) { panic("used to satisfy pool interface") } -func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { +func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) @@ -58,7 +58,7 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { res, err := sp.pool.execWithTTL(ctx, rqs) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } return res, nil @@ -136,7 +136,7 @@ func (sp *supervised) control() { //nolint:gocognit /* worker at this point might be in the middle of request execution: - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release ^ TTL Reached, state - invalid | -----> Worker Stopped here @@ -156,7 +156,7 @@ func (sp *supervised) control() { //nolint:gocognit /* worker at this point might be in the middle of request execution: - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release ^ TTL Reached, state - invalid | -----> Worker Stopped here @@ -211,7 +211,7 @@ func (sp *supervised) control() { //nolint:gocognit /* worker at this point might be in the middle of request execution: - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release ^ TTL Reached, state - invalid | -----> Worker Stopped here @@ -221,7 +221,7 @@ func (sp *supervised) control() { //nolint:gocognit workers[i].State().Set(worker.StateInvalid) _ = workers[i].Stop() } - // just to double check + // just to double-check workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 1cd301ba..0702a71f 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/require" ) -var cfgSupervised = Config{ +var cfgSupervised = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -43,7 +43,7 @@ func TestSupervisedPool_Exec(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 100) - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -73,7 +73,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 100) - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -84,7 +84,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { } func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -110,14 +110,13 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) assert.Error(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) + assert.Empty(t, resp) time.Sleep(time.Second * 1) // should be new worker with new pid @@ -125,7 +124,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { } func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), Supervisor: &SupervisorConfig{ WatchTick: 1 * time.Second, @@ -145,7 +144,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -159,7 +158,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) pid = p.Workers()[0].Pid() - resp, err = p.Exec(payload.Payload{ + resp, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -177,7 +176,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { } func TestSupervisedPool_Idle(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -202,7 +201,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -214,7 +213,7 @@ func TestSupervisedPool_Idle(t *testing.T) { time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -225,7 +224,7 @@ func TestSupervisedPool_Idle(t *testing.T) { } func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -251,7 +250,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -267,7 +266,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { } func TestSupervisedPool_ExecTTL_OK(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -294,7 +293,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -309,7 +308,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { } func TestSupervisedPool_MaxMemoryReached(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -346,7 +345,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go new file mode 100644 index 00000000..fc043927 --- /dev/null +++ b/pkg/priority_queue/binary_heap.go @@ -0,0 +1,125 @@ +/* +binary heap (min-heap) algorithm used as a core for the priority queue +*/ + +package priorityqueue + +import ( + "sync" + "sync/atomic" +) + +type BinHeap struct { + items []Item + // find a way to use pointer to the raw data + len uint64 + maxLen uint64 + cond sync.Cond +} + +func NewBinHeap(maxLen uint64) *BinHeap { + return &BinHeap{ + items: make([]Item, 0, 1000), + len: 0, + maxLen: maxLen, + cond: sync.Cond{L: &sync.Mutex{}}, + } +} + +func (bh *BinHeap) fixUp() { + k := bh.len - 1 + p := (k - 1) >> 1 // k-1 / 2 + + for k > 0 { + cur, par := (bh.items)[k], (bh.items)[p] + + if cur.Priority() < par.Priority() { + bh.swap(k, p) + k = p + p = (k - 1) >> 1 + } else { + return + } + } +} + +func (bh *BinHeap) swap(i, j uint64) { + (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i] +} + +func (bh *BinHeap) fixDown(curr, end int) { + cOneIdx := (curr << 1) + 1 + for cOneIdx <= end { + cTwoIdx := -1 + if (curr<<1)+2 <= end { + cTwoIdx = (curr << 1) + 2 + } + + idxToSwap := cOneIdx + if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() { + idxToSwap = cTwoIdx + } + if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() { + bh.swap(uint64(curr), uint64(idxToSwap)) + curr = idxToSwap + cOneIdx = (curr << 1) + 1 + } else { + return + } + } +} + +func (bh *BinHeap) Len() uint64 { + return atomic.LoadUint64(&bh.len) +} + +func (bh *BinHeap) Insert(item Item) { + bh.cond.L.Lock() + + // check the binary heap len before insertion + if bh.Len() > bh.maxLen { + // unlock the mutex to proceed to get-max + bh.cond.L.Unlock() + + // signal waiting goroutines + for bh.Len() > 0 { + // signal waiting goroutines + bh.cond.Signal() + } + // lock mutex to proceed inserting into the empty slice + bh.cond.L.Lock() + } + + bh.items = append(bh.items, item) + + // add len to the slice + atomic.AddUint64(&bh.len, 1) + + // fix binary heap up + bh.fixUp() + bh.cond.L.Unlock() + + // signal the goroutine on wait + bh.cond.Signal() +} + +func (bh *BinHeap) ExtractMin() Item { + bh.cond.L.Lock() + + // if len == 0, wait for the signal + for bh.Len() == 0 { + bh.cond.Wait() + } + + bh.swap(0, bh.len-1) + + item := (bh.items)[int(bh.len)-1] + bh.items = (bh).items[0 : int(bh.len)-1] + bh.fixDown(0, int(bh.len-2)) + + // reduce len + atomic.AddUint64(&bh.len, ^uint64(0)) + + bh.cond.L.Unlock() + return item +} diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go new file mode 100644 index 00000000..fb5b83de --- /dev/null +++ b/pkg/priority_queue/binary_heap_test.go @@ -0,0 +1,128 @@ +package priorityqueue + +import ( + "fmt" + "math/rand" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type Test int + +func (t Test) Ack() error { + return nil +} + +func (t Test) Nack() error { + return nil +} + +func (t Test) Requeue(_ map[string][]string, _ int64) error { + return nil +} + +func (t Test) Body() []byte { + return nil +} + +func (t Test) Context() ([]byte, error) { + return nil, nil +} + +func (t Test) ID() string { + return "none" +} + +func (t Test) Priority() int64 { + return int64(t) +} + +func TestBinHeap_Init(t *testing.T) { + a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} + + bh := NewBinHeap(12) + + for i := 0; i < len(a); i++ { + bh.Insert(a[i]) + } + + expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} + + res := make([]Item, 0, 12) + + for i := 0; i < 11; i++ { + item := bh.ExtractMin() + res = append(res, item) + } + + require.Equal(t, expected, res) +} + +func TestNewPriorityQueue(t *testing.T) { + insertsPerSec := uint64(0) + getPerSec := uint64(0) + stopCh := make(chan struct{}, 1) + pq := NewBinHeap(1000) + + go func() { + tt3 := time.NewTicker(time.Millisecond * 10) + for { + select { + case <-tt3.C: + require.Less(t, pq.Len(), uint64(1002)) + case <-stopCh: + return + } + } + }() + + go func() { + tt := time.NewTicker(time.Second) + + for { + select { + case <-tt.C: + fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec))) + atomic.StoreUint64(&insertsPerSec, 0) + fmt.Println(fmt.Sprintf("ExtractMin per second: %d", atomic.LoadUint64(&getPerSec))) + atomic.StoreUint64(&getPerSec, 0) + case <-stopCh: + tt.Stop() + return + } + } + }() + + go func() { + for { + select { + case <-stopCh: + return + default: + pq.ExtractMin() + atomic.AddUint64(&getPerSec, 1) + } + } + }() + + go func() { + for { + select { + case <-stopCh: + return + default: + pq.Insert(Test(rand.Int())) //nolint:gosec + atomic.AddUint64(&insertsPerSec, 1) + } + } + }() + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + stopCh <- struct{}{} + stopCh <- struct{}{} + stopCh <- struct{}{} +} diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go new file mode 100644 index 00000000..9efa4652 --- /dev/null +++ b/pkg/priority_queue/interface.go @@ -0,0 +1,31 @@ +package priorityqueue + +type Queue interface { + Insert(item Item) + ExtractMin() Item + Len() uint64 +} + +// Item represents binary heap item +type Item interface { + // ID is a unique item identifier + ID() string + + // Priority returns the Item's priority to sort + Priority() int64 + + // Body is the Item payload + Body() []byte + + // Context is the Item meta information + Context() ([]byte, error) + + // Ack - acknowledge the Item after processing + Ack() error + + // Nack - discard the Item + Nack() error + + // Requeue - put the message back to the queue with the optional delay + Requeue(headers map[string][]string, delay int64) error +} diff --git a/pkg/process/state.go b/pkg/process/state.go index 652ec77c..bfc3a287 100644 --- a/pkg/process/state.go +++ b/pkg/process/state.go @@ -32,20 +32,20 @@ type State struct { } // WorkerProcessState creates new worker state definition. -func WorkerProcessState(w worker.BaseProcess) (State, error) { +func WorkerProcessState(w worker.BaseProcess) (*State, error) { const op = errors.Op("worker_process_state") p, _ := process.NewProcess(int32(w.Pid())) i, err := p.MemoryInfo() if err != nil { - return State{}, errors.E(op, err) + return nil, errors.E(op, err) } percent, err := p.CPUPercent() if err != nil { - return State{}, err + return nil, err } - return State{ + return &State{ CPUPercent: percent, Pid: int(w.Pid()), Status: w.State().String(), diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go deleted file mode 100644 index 06252d70..00000000 --- a/pkg/pubsub/interface.go +++ /dev/null @@ -1,54 +0,0 @@ -package pubsub - -/* -This interface is in BETA. It might be changed. -*/ - -// PubSub interface designed to implement on any storage type to provide pub-sub abilities -// Publisher used to receive messages from the PHP app via RPC -// Subscriber should be implemented to subscribe to a topics and provide a connections list per topic -// Reader return next message from the channel -type PubSub interface { - Publisher - Subscriber - Reader -} - -type SubReader interface { - Subscriber - Reader -} - -// Subscriber defines the ability to operate as message passing broker. -// BETA interface -type Subscriber interface { - // Subscribe broker to one or multiple topics. - Subscribe(connectionID string, topics ...string) error - - // Unsubscribe from one or multiply topics - Unsubscribe(connectionID string, topics ...string) error - - // Connections returns all connections associated with the particular topic - Connections(topic string, ret map[string]struct{}) -} - -// Publisher publish one or more messages -// BETA interface -type Publisher interface { - // Publish one or multiple Channel. - Publish(message *Message) error - - // PublishAsync publish message and return immediately - // If error occurred it will be printed into the logger - PublishAsync(message *Message) -} - -// Reader interface should return next message -type Reader interface { - Next() (*Message, error) -} - -// Constructor is a special pub-sub interface made to return a constructed PubSub type -type Constructor interface { - PSConstruct(key string) (PubSub, error) -} diff --git a/pkg/pubsub/psmessage.go b/pkg/pubsub/psmessage.go deleted file mode 100644 index e33d9284..00000000 --- a/pkg/pubsub/psmessage.go +++ /dev/null @@ -1,15 +0,0 @@ -package pubsub - -import json "github.com/json-iterator/go" - -// Message represents a single message with payload bound to a particular topic -type Message struct { - // Topic (channel in terms of redis) - Topic string `json:"topic"` - // Payload (on some decode stages might be represented as base64 string) - Payload []byte `json:"payload"` -} - -func (m *Message) MarshalBinary() (data []byte, err error) { - return json.Marshal(m) -} diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go index 7e3e5350..1b072378 100644 --- a/pkg/transport/interface.go +++ b/pkg/transport/interface.go @@ -8,7 +8,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker" ) -// Factory is responsible of wrapping given command into tasks WorkerProcess. +// Factory is responsible for wrapping given command into tasks WorkerProcess. type Factory interface { // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. // Process must not be started. diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go index 19f4f92d..9433a510 100755 --- a/pkg/transport/pipe/pipe_factory.go +++ b/pkg/transport/pipe/pipe_factory.go @@ -22,42 +22,54 @@ func NewPipeFactory() *Factory { return &Factory{} } -type SpawnResult struct { +type sr struct { w *worker.Process err error } // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - c := make(chan SpawnResult) +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit + spCh := make(chan sr) const op = errors.Op("factory_spawn_worker_with_timeout") go func() { w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } in, err := cmd.StdoutPipe() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } out, err := cmd.StdinPipe() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } // Init new PIPE relay @@ -67,42 +79,69 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis // Start the worker err = w.Start() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } - // errors bundle pid, err := internal.FetchPID(relay) - if pid != w.Pid() || err != nil { + if err != nil { err = multierr.Combine( err, w.Kill(), w.Wait(), ) - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + _ = w.Kill() + return } - return } - // everything ok, set ready state - w.State().Set(worker.StateReady) + if pid != w.Pid() { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), + }: + return + default: + _ = w.Kill() + return + } + } + select { + case // return worker - c <- SpawnResult{ + spCh <- sr{ w: w, err: nil, + }: + // everything ok, set ready state + w.State().Set(worker.StateReady) + return + default: + _ = w.Kill() + return } }() select { case <-ctx.Done(): return nil, ctx.Err() - case res := <-c: + case res := <-spCh: if res.err != nil { return nil, res.err } diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index 51befb1e..f5e9669b 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -144,7 +144,7 @@ func Test_Pipe_Echo2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -168,11 +168,10 @@ func Test_Pipe_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) } func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { @@ -215,7 +214,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -238,7 +237,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -261,7 +260,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -287,7 +286,7 @@ func Test_Echo2(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -314,11 +313,10 @@ func Test_BadPayload2(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{}) + res, err := sw.Exec(&payload.Payload{}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) assert.Contains(t, err.Error(), "payload can not be empty") } @@ -358,7 +356,7 @@ func Test_Echo_Slow2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -387,10 +385,9 @@ func Test_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) time.Sleep(time.Second * 3) mu.Lock() @@ -418,10 +415,9 @@ func Test_Error2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") @@ -445,19 +441,19 @@ func Test_NumExecs2(t *testing.T) { sw := worker.From(w) - _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(1), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(2), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index 3ef65be8..d243a93f 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -102,6 +102,7 @@ func Test_Pipe_PipeError(t *testing.T) { func Test_Pipe_PipeError2(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + // error cause _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) @@ -159,7 +160,7 @@ func Test_Pipe_Echo(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -184,11 +185,10 @@ func Test_Pipe_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) } func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { @@ -231,7 +231,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -255,7 +255,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -279,7 +279,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -305,7 +305,7 @@ func Test_Echo(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -333,11 +333,10 @@ func Test_BadPayload(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{}) + res, err := sw.Exec(&payload.Payload{}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) assert.Contains(t, err.Error(), "payload can not be empty") } @@ -379,7 +378,7 @@ func Test_Echo_Slow(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -409,10 +408,9 @@ func Test_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) time.Sleep(time.Second * 3) mu.Lock() @@ -441,10 +439,9 @@ func Test_Error(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") @@ -469,19 +466,19 @@ func Test_NumExecs(t *testing.T) { sw := worker.From(w) - _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(1), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(2), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index 965a0f30..dc2b75cf 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -2,6 +2,7 @@ package socket import ( "context" + "fmt" "net" "os/exec" "sync" @@ -29,8 +30,6 @@ type Factory struct { // sockets which are waiting for process association relays sync.Map - - ErrCh chan error } // NewSocketServer returns Factory attached to a given socket listener. @@ -40,14 +39,17 @@ func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { ls: ls, tout: tout, relays: sync.Map{}, - ErrCh: make(chan error, 10), } // Be careful // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go // https://github.com/golang/go/issues/5045 go func() { - f.ErrCh <- f.listen() + err := f.listen() + // there is no logger here, use fmt + if err != nil { + fmt.Printf("[WARN]: socket server listen, error: %v\n", err) + } }() return f @@ -90,20 +92,28 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis defer cancel() w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: nil, - err: err, + err: errors.E(op, err), + }: + return + default: + return } - return } err = w.Start() if err != nil { - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } rl, err := f.findRelayWithContext(ctxT, w) @@ -114,19 +124,31 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis w.Wait(), ) - c <- socketSpawn{ + select { + // try to write result + case c <- socketSpawn{ w: nil, err: errors.E(op, err), + }: + return + // if no receivers - return + default: + return } - return } w.AttachRelay(rl) w.State().Set(worker.StateReady) - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: w, err: nil, + }: + return + default: + _ = w.Kill() + return } }() @@ -165,6 +187,17 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } w.AttachRelay(rl) + + // errors bundle + if pid, err := internal.FetchPID(rl); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, errors.E(op, err) + } + w.State().Set(worker.StateReady) return w, nil diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go index b875e2c8..905a3b6b 100644 --- a/pkg/transport/socket/socket_factory_spawn_test.go +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -16,7 +16,7 @@ import ( ) func Test_Tcp_Start2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -45,7 +45,7 @@ func Test_Tcp_Start2(t *testing.T) { } func Test_Tcp_StartCloseFactory2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { } else { t.Skip("socket is busy") @@ -72,7 +72,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) { } func Test_Tcp_StartError2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -96,7 +96,7 @@ func Test_Tcp_StartError2(t *testing.T) { } func Test_Tcp_Failboot2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err3 := ls.Close() @@ -128,7 +128,7 @@ func Test_Tcp_Failboot2(t *testing.T) { } func Test_Tcp_Invalid2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -148,7 +148,7 @@ func Test_Tcp_Invalid2(t *testing.T) { } func Test_Tcp_Broken2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -194,16 +194,15 @@ func Test_Tcp_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) wg.Wait() <-finish } func Test_Tcp_Echo2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -230,7 +229,7 @@ func Test_Tcp_Echo2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -363,11 +362,10 @@ func Test_Unix_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) wg.Wait() <-finish } @@ -398,7 +396,7 @@ func Test_Unix_Echo2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -459,7 +457,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -528,7 +526,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index 34fe088b..f9bb2178 100755 --- a/pkg/transport/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -19,7 +19,7 @@ func Test_Tcp_Start(t *testing.T) { ctx := context.Background() time.Sleep(time.Millisecond * 10) // to ensure free socket - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err = ls.Close() @@ -50,7 +50,7 @@ func Test_Tcp_Start(t *testing.T) { func Test_Tcp_StartCloseFactory(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { } else { t.Skip("socket is busy") @@ -79,7 +79,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { func Test_Tcp_StartError(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err = ls.Close() @@ -106,7 +106,7 @@ func Test_Tcp_Failboot(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err3 := ls.Close() @@ -140,7 +140,7 @@ func Test_Tcp_Failboot(t *testing.T) { func Test_Tcp_Timeout(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err = ls.Close() @@ -163,7 +163,7 @@ func Test_Tcp_Timeout(t *testing.T) { func Test_Tcp_Invalid(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err = ls.Close() @@ -185,7 +185,7 @@ func Test_Tcp_Invalid(t *testing.T) { func Test_Tcp_Broken(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -231,10 +231,9 @@ func Test_Tcp_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) wg.Wait() <-finish } @@ -242,7 +241,7 @@ func Test_Tcp_Broken(t *testing.T) { func Test_Tcp_Echo(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err = ls.Close() @@ -269,7 +268,7 @@ func Test_Tcp_Echo(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -434,11 +433,10 @@ func Test_Unix_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block wg.Wait() } @@ -475,7 +473,7 @@ func Test_Unix_Echo(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -487,7 +485,7 @@ func Test_Unix_Echo(t *testing.T) { func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if err == nil { defer func() { err = ls.Close() @@ -520,7 +518,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if err == nil { defer func() { err = ls.Close() @@ -548,7 +546,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -613,7 +611,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go index d2cfe2cd..ed8704bb 100644 --- a/pkg/worker/interface.go +++ b/pkg/worker/interface.go @@ -68,7 +68,7 @@ type SyncWorker interface { // BaseProcess provides basic functionality for the SyncWorker BaseProcess // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(rqs payload.Payload) (payload.Payload, error) + Exec(rqs *payload.Payload) (*payload.Payload, error) // ExecWithTTL used to handle Exec with TTL - ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) + ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 02f11d0b..74e29b71 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -23,7 +23,7 @@ type SyncWorkerImpl struct { } // From creates SyncWorker from BaseProcess -func From(process *Process) SyncWorker { +func From(process *Process) *SyncWorkerImpl { return &SyncWorkerImpl{ process: process, fPool: sync.Pool{New: func() interface{} { @@ -36,14 +36,14 @@ func From(process *Process) SyncWorker { } // Exec payload without TTL timeout. -func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec") if len(p.Body) == 0 && len(p.Context) == 0 { - return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) + return nil, errors.E(op, errors.Str("payload can not be empty")) } if tw.process.State().Value() != StateReady { - return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) + return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time @@ -57,7 +57,7 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } // supervisor may set state of the worker during the work @@ -74,28 +74,26 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { } type wexec struct { - payload payload.Payload + payload *payload.Payload err error } // ExecWithTTL executes payload without TTL timeout. -func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) go func() { if len(p.Body) == 0 && len(p.Context) == 0 { c <- wexec{ - payload: payload.Payload{}, - err: errors.E(op, errors.Str("payload can not be empty")), + err: errors.E(op, errors.Str("payload can not be empty")), } return } if tw.process.State().Value() != StateReady { c <- wexec{ - payload: payload.Payload{}, - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), } return } @@ -112,8 +110,7 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p tw.process.State().RegisterExec() } c <- wexec{ - payload: payload.Payload{}, - err: errors.E(op, err), + err: errors.E(op, err), } return } @@ -143,18 +140,18 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p if err != nil { // append timeout error err = multierr.Append(err, errors.E(op, errors.ExecTTL)) - return payload.Payload{}, multierr.Append(err, ctx.Err()) + return nil, multierr.Append(err, ctx.Err()) } - return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err()) + return nil, errors.E(op, errors.ExecTTL, ctx.Err()) case res := <-c: if res.err != nil { - return payload.Payload{}, res.err + return nil, res.err } return res.payload, nil } } -func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec_payload") // get a frame @@ -162,7 +159,7 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error defer tw.putFrame(fr) // can be 0 here - fr.WriteVersion(frame.VERSION_1) + fr.WriteVersion(fr.Header(), frame.VERSION_1) // obtain a buffer buf := tw.get() @@ -171,18 +168,18 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error buf.Write(p.Body) // Context offset - fr.WriteOptions(uint32(len(p.Context))) - fr.WritePayloadLen(uint32(buf.Len())) + fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) + fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) fr.WritePayload(buf.Bytes()) - fr.WriteCRC() + fr.WriteCRC(fr.Header()) // return buffer tw.put(buf) err := tw.Relay().Send(fr) if err != nil { - return payload.Payload{}, errors.E(op, errors.Network, err) + return nil, errors.E(op, errors.Network, err) } frameR := tw.getFrame() @@ -190,34 +187,34 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error err = tw.process.Relay().Receive(frameR) if err != nil { - return payload.Payload{}, errors.E(op, errors.Network, err) + return nil, errors.E(op, errors.Network, err) } if frameR == nil { - return payload.Payload{}, errors.E(op, errors.Network, errors.Str("nil fr received")) + return nil, errors.E(op, errors.Network, errors.Str("nil fr received")) } - if !frameR.VerifyCRC() { - return payload.Payload{}, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) + if !frameR.VerifyCRC(frameR.Header()) { + return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) } flags := frameR.ReadFlags() if flags&frame.ERROR != byte(0) { - return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) + return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) } - options := frameR.ReadOptions() + options := frameR.ReadOptions(frameR.Header()) if len(options) != 1 { - return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) + return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) } - pld := payload.Payload{ + pld := &payload.Payload{ Body: make([]byte, len(frameR.Payload()[options[0]:])), Context: make([]byte, len(frameR.Payload()[:options[0]])), } // by copying we free frame's payload slice - // so we do not hold the pointer from the smaller slice to the initial (which is should be in the sync.Pool) + // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool) // https://blog.golang.org/slices-intro#TOC_6. copy(pld.Body, frameR.Payload()[options[0]:]) copy(pld.Context, frameR.Payload()[:options[0]]) diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go index df556e93..64580f9f 100755 --- a/pkg/worker/sync_worker_test.go +++ b/pkg/worker/sync_worker_test.go @@ -24,11 +24,10 @@ func Test_NotStarted_Exec(t *testing.T) { sw := From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) assert.Contains(t, err.Error(), "Process is not ready (inactive)") } diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go index 44c466bb..3d60897b 100644 --- a/pkg/worker_handler/request.go +++ b/pkg/worker_handler/request.go @@ -138,18 +138,18 @@ func (r *Request) Close(log logger.Logger) { // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open // files prior to calling this method. -func (r *Request) Payload() (payload.Payload, error) { +func (r *Request) Payload() (*payload.Payload, error) { const op = errors.Op("marshal_payload") - p := payload.Payload{} + p := &payload.Payload{} var err error if p.Context, err = json.Marshal(r); err != nil { - return payload.Payload{}, errors.E(op, errors.Encode, err) + return nil, errors.E(op, errors.Encode, err) } if r.Parsed { if p.Body, err = json.Marshal(r.body); err != nil { - return payload.Payload{}, errors.E(op, errors.Encode, err) + return nil, errors.E(op, errors.Encode, err) } } else if r.body != nil { p.Body = r.body.([]byte) diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go index cbf22794..d22f09d4 100644 --- a/pkg/worker_handler/response.go +++ b/pkg/worker_handler/response.go @@ -22,7 +22,7 @@ type Response struct { } // NewResponse creates new response based on given pool payload. -func NewResponse(p payload.Payload) (*Response, error) { +func NewResponse(p *payload.Payload) (*Response, error) { const op = errors.Op("http_response") r := &Response{Body: p.Body} if err := json.Unmarshal(p.Context, r); err != nil { diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go new file mode 100644 index 00000000..51093978 --- /dev/null +++ b/pkg/worker_watcher/container/channel/vec.go @@ -0,0 +1,99 @@ +package channel + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Vec struct { + sync.RWMutex + // destroy signal + destroy uint64 + // channel with the workers + workers chan worker.BaseProcess + + len uint64 +} + +func NewVector(len uint64) *Vec { + vec := &Vec{ + destroy: 0, + len: len, + workers: make(chan worker.BaseProcess, len), + } + + return vec +} + +// Push is O(1) operation +// In case of TTL and full channel O(n) worst case, where n is len of the channel +func (v *Vec) Push(w worker.BaseProcess) { + // Non-blocking channel send + select { + case v.workers <- w: + // default select branch is only possible when dealing with TTL + // because in that case, workers in the v.workers channel can be TTL-ed and killed + // but presenting in the channel + default: + v.Lock() + defer v.Unlock() + + /* + we can be in the default branch by the following reasons: + 1. TTL is set with no requests during the TTL + 2. Violated Get <-> Release operation (how ??) + */ + for i := uint64(0); i < v.len; i++ { + wrk := <-v.workers + switch wrk.State().Value() { + // skip good states + case worker.StateWorking, worker.StateReady: + // put the worker back + // generally, while send and receive operations are concurrent (from the channel), channel behave + // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO + v.workers <- wrk + continue + default: + // kill the current worker (just to be sure it's dead) + _ = wrk.Kill() + // replace with the new one + v.workers <- w + return + } + } + } +} + +func (v *Vec) Remove(_ int64) {} + +func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { + /* + if *addr == old { + *addr = new + return true + } + */ + + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + return nil, errors.E(errors.WatcherStopped) + } + + // used only for the TTL-ed workers + v.RLock() + defer v.RUnlock() + + select { + case w := <-v.workers: + return w, nil + case <-ctx.Done(): + return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) + } +} + +func (v *Vec) Destroy() { + atomic.StoreUint64(&v.destroy, 1) +} diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go deleted file mode 100644 index e10ecdae..00000000 --- a/pkg/worker_watcher/container/interface.go +++ /dev/null @@ -1,17 +0,0 @@ -package container - -import ( - "context" - - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -// Vector interface represents vector container -type Vector interface { - // Enqueue used to put worker to the vector - Enqueue(worker.BaseProcess) - // Dequeue used to get worker from the vector - Dequeue(ctx context.Context) (worker.BaseProcess, error) - // Destroy used to stop releasing the workers - Destroy() -} diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go new file mode 100644 index 00000000..edf81d60 --- /dev/null +++ b/pkg/worker_watcher/container/queue/queue.go @@ -0,0 +1,102 @@ +package queue + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +const ( + initialSize = 1 + maxInitialSize = 8 + maxInternalSliceSize = 10 +) + +type Node struct { + w []worker.BaseProcess + // LL + n *Node +} + +type Queue struct { + mu sync.Mutex + + head *Node + tail *Node + + curr uint64 + len uint64 + + sliceSize uint64 +} + +func NewQueue() *Queue { + q := &Queue{ + mu: sync.Mutex{}, + head: nil, + tail: nil, + curr: 0, + len: 0, + sliceSize: 0, + } + + return q +} + +func (q *Queue) Push(w worker.BaseProcess) { + q.mu.Lock() + + if q.head == nil { + h := newNode(initialSize) + q.head = h + q.tail = h + q.sliceSize = maxInitialSize + } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) { + n := newNode(maxInternalSliceSize) + q.tail.n = n + q.tail = n + q.sliceSize = maxInternalSliceSize + } + + q.tail.w = append(q.tail.w, w) + + atomic.AddUint64(&q.len, 1) + + q.mu.Unlock() +} + +func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) { + q.mu.Lock() + + if q.head == nil { + return nil, nil + } + + w := q.head.w[q.curr] + q.head.w[q.curr] = nil + atomic.AddUint64(&q.len, ^uint64(0)) + atomic.AddUint64(&q.curr, 1) + + if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) { + n := q.head.n + q.head.n = nil + q.head = n + q.curr = 0 + } + + q.mu.Unlock() + + return w, nil +} + +func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) { + +} + +func (q *Queue) Destroy() {} + +func newNode(capacity int) *Node { + return &Node{w: make([]worker.BaseProcess, 0, capacity)} +} diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go deleted file mode 100644 index 24b5fa6d..00000000 --- a/pkg/worker_watcher/container/vec.go +++ /dev/null @@ -1,51 +0,0 @@ -package container - -import ( - "context" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -type Vec struct { - destroy uint64 - workers chan worker.BaseProcess -} - -func NewVector(initialNumOfWorkers uint64) *Vec { - vec := &Vec{ - destroy: 0, - workers: make(chan worker.BaseProcess, initialNumOfWorkers), - } - - return vec -} - -func (v *Vec) Enqueue(w worker.BaseProcess) { - v.workers <- w -} - -func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) { - /* - if *addr == old { - *addr = new - return true - } - */ - - if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { - return nil, errors.E(errors.WatcherStopped) - } - - select { - case w := <-v.workers: - return w, nil - case <-ctx.Done(): - return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) - } -} - -func (v *Vec) Destroy() { - atomic.StoreUint64(&v.destroy, 1) -} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index b2d61d48..348be199 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -8,45 +8,54 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" + "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" ) // Vector interface represents vector container type Vector interface { - // Enqueue used to put worker to the vector - Enqueue(worker.BaseProcess) - // Dequeue used to get worker from the vector - Dequeue(ctx context.Context) (worker.BaseProcess, error) + // Push used to put worker to the vector + Push(worker.BaseProcess) + // Pop used to get worker from the vector + Pop(ctx context.Context) (worker.BaseProcess, error) + // Remove worker with provided pid + Remove(pid int64) // Destroy used to stop releasing the workers Destroy() + + // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation + // Replace(prevPid int64, newWorker worker.BaseProcess) +} + +type workerWatcher struct { + sync.RWMutex + container Vector + // used to control Destroy stage (that all workers are in the container) + numWorkers uint64 + + workers []worker.BaseProcess + + allocator worker.Allocator + events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { ww := &workerWatcher{ - container: container.NewVector(numWorkers), + container: channel.NewVector(numWorkers), numWorkers: numWorkers, - workers: make([]worker.BaseProcess, 0, numWorkers), - allocator: allocator, - events: events, + + workers: make([]worker.BaseProcess, 0, numWorkers), + + allocator: allocator, + events: events, } return ww } -type workerWatcher struct { - sync.RWMutex - container Vector - // used to control the Destroy stage (that all workers are in the container) - numWorkers uint64 - workers []worker.BaseProcess - allocator worker.Allocator - events events.Handler -} - func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - ww.container.Enqueue(workers[i]) + ww.container.Push(workers[i]) // add worker to watch slice ww.workers = append(ww.workers, workers[i]) @@ -57,12 +66,12 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { return nil } -// Get is not a thread safe operation -func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { +// Take is not a thread safe operation +func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation - w, err := ww.container.Dequeue(ctx) + w, err := ww.container.Pop(ctx) if errors.Is(errors.WatcherStopped, err) { return nil, errors.E(op, errors.WatcherStopped) } @@ -78,11 +87,11 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { // ========================================================= // SLOW PATH - _ = w.Kill() // how the worker get here??????? - // no free workers in the container + _ = w.Kill() + // no free workers in the container or worker not in the ReadyState (TTL-ed) // try to continuously get free one for { - w, err = ww.container.Dequeue(ctx) + w, err = ww.container.Pop(ctx) if errors.Is(errors.WatcherStopped, err) { return nil, errors.E(op, errors.WatcherStopped) @@ -98,7 +107,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { case worker.StateReady: return w, nil case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work + ww.container.Push(w) // put it back, let worker finish the work continue case // all the possible wrong states @@ -135,7 +144,7 @@ func (ww *workerWatcher) Allocate() error { // unlock Allocate mutex ww.Unlock() // push the worker to the container - ww.Push(sw) + ww.Release(sw) return nil } @@ -158,11 +167,11 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { } } -// Push O(1) operation -func (ww *workerWatcher) Push(w worker.BaseProcess) { +// Release O(1) operation +func (ww *workerWatcher) Release(w worker.BaseProcess) { switch w.State().Value() { case worker.StateReady: - ww.container.Enqueue(w) + ww.container.Push(w) default: _ = w.Kill() } @@ -226,13 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } + // remove worker + ww.Remove(w) + if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } - ww.Remove(w) + // set state as stopped + w.State().Set(worker.StateStopped) + err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ |