diff options
author | Valery Piashchynski <[email protected]> | 2021-03-28 14:00:54 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-03-28 14:00:54 +0300 |
commit | 2a58b1be2c79f2fe10c0a429878937661645a928 (patch) | |
tree | f3a7cd472c75c4dd2a97bcf97cb154731ed81230 /pkg | |
parent | 970014530a23d57a3be41c6369ac6456d0b36ae1 (diff) |
- Fix bug with the worker reallocating during the response
- Update .golangci and fix new warnings
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/pool/config.go | 2 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 43 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 10 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 1 | ||||
-rw-r--r-- | pkg/transport/pipe/pipe_factory_spawn_test.go | 2 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory_test.go | 2 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory.go | 14 | ||||
-rw-r--r-- | pkg/transport/socket/socket_factory_spawn_test.go | 56 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory_test.go | 42 | ||||
-rwxr-xr-x | pkg/worker/state.go | 3 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 4 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 1 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack.go | 143 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack_test.go | 143 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 73 |
15 files changed, 125 insertions, 414 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go index 782f7ce9..2a3dabe4 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -5,7 +5,7 @@ import ( "time" ) -// Configures the pool behaviour. +// Configures the pool behavior. type Config struct { // Debug flag creates new fresh worker before every request. Debug bool diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 0617cbc0..c8e45b82 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -47,7 +47,7 @@ type StaticPool struct { allocator worker.Allocator // err_encoder is the default Exec error encoder - err_encoder ErrorEncoder //nolint + err_encoder ErrorEncoder //nolint:golint,stylecheck } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -159,11 +159,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { return sp.Exec(p) } - err = sp.checkMaxJobs(w) - if err != nil { - return payload.Payload{}, errors.E(op, err) + if sp.cfg.MaxJobs != 0 { + sp.checkMaxJobs(w) + return rsp, nil } - + // return worker back + sp.ww.Push(w) return rsp, nil } @@ -188,11 +189,13 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo return sp.execWithTTL(ctx, p) } - err = sp.checkMaxJobs(w) - if err != nil { - return payload.Payload{}, errors.E(op, err) + if sp.cfg.MaxJobs != 0 { + sp.checkMaxJobs(w) + return rsp, nil } + // return worker back + sp.ww.Push(w) return rsp, nil } @@ -206,19 +209,15 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { } // checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs -func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error { - const op = errors.Op("static_pool_check_max_jobs") - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - w.State().Set(worker.StateDestroyed) - sp.ww.Remove(w) - err := sp.ww.Allocate() - if err != nil { - return errors.E(op, err) - } - } else { +//go:inline +func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) { + if w.State().NumExecs() >= sp.cfg.MaxJobs { + w.State().Set(worker.StateMaxJobsReached) sp.ww.Push(w) + return } - return nil + + sp.ww.Push(w) } func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { @@ -281,9 +280,9 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { return func() (worker.SyncWorker, error) { - ctx, cancel := context.WithTimeout(ctx, timeout) + ctxT, cancel := context.WithTimeout(ctx, timeout) defer cancel() - w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...) + w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...) if err != nil { return nil, err } @@ -316,7 +315,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { // allocate required number of stack func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { const op = errors.Op("allocate workers") - var workers []worker.BaseProcess + workers := make([]worker.BaseProcess, 0, numWorkers) // constant number of stack simplify logic for i := uint64(0); i < numWorkers; i++ { diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index b1318f9d..2d8aad48 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -213,7 +213,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } } - var cfg = Config{ + var cfg2 = Config{ NumWorkers: 1, AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, @@ -223,7 +223,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - cfg, + cfg2, AddListeners(listener), ) assert.NoError(t, err) @@ -361,7 +361,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { pipe.NewPipeFactory(), Config{ NumWorkers: 1, - AllocateTimeout: time.Second, + AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, }, ) @@ -432,8 +432,8 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, err := p.Exec(payload.Payload{Body: []byte("100")}) - if err != nil { + _, errP := p.Exec(payload.Payload{Body: []byte("100")}) + if errP != nil { t.Errorf("error executing payload: error %v", err) } }() diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 5abeae7a..273adc30 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -166,7 +166,6 @@ func (sp *supervised) Stop() { func (sp *supervised) control() { now := time.Now() - const op = errors.Op("supervised_pool_control_tick") // MIGHT BE OUTDATED // It's a copy of the Workers pointers diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index a00b2117..51befb1e 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -281,7 +281,7 @@ func Test_Echo2(t *testing.T) { assert.NoError(t, sw.Wait()) }() defer func() { - err := sw.Stop() + err = sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index fb40ecb0..3ef65be8 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -299,7 +299,7 @@ func Test_Echo(t *testing.T) { assert.NoError(t, sw.Wait()) }() defer func() { - err := sw.Stop() + err = sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index f58f9561..990eb384 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -88,7 +88,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis const op = errors.Op("factory_spawn_worker_with_timeout") c := make(chan socketSpawn) go func() { - ctx, cancel := context.WithTimeout(ctx, f.tout) + ctxT, cancel := context.WithTimeout(ctx, f.tout) defer cancel() w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { @@ -108,7 +108,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis return } - rl, err := f.findRelayWithContext(ctx, w) + rl, err := f.findRelayWithContext(ctxT, w) if err != nil { err = multierr.Combine( err, @@ -179,18 +179,19 @@ func (f *Factory) Close() error { // waits for Process to connect over socket and returns associated relay of timeout func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { - ticker := time.NewTicker(time.Millisecond * 100) + ticker := time.NewTicker(time.Millisecond * 10) for { select { case <-ctx.Done(): return nil, ctx.Err() case <-ticker.C: + // check for the process exists _, err := process.NewProcess(int32(w.Pid())) if err != nil { return nil, err } default: - tmp, ok := f.relays.Load(w.Pid()) + tmp, ok := f.relays.LoadAndDelete(w.Pid()) if !ok { continue } @@ -221,8 +222,3 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { f.relays.Store(pid, relay) } - -// deletes relay chan associated with specific pid -func (f *Factory) removeRelayFromPid(pid int64) { - f.relays.Delete(pid) -} diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go index 1361693b..b875e2c8 100644 --- a/pkg/transport/socket/socket_factory_spawn_test.go +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -19,8 +19,8 @@ func Test_Tcp_Start2(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -55,7 +55,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) { f := NewSocketServer(ls, time.Minute) defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -75,8 +75,8 @@ func Test_Tcp_StartError2(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -131,8 +131,8 @@ func Test_Tcp_Invalid2(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -151,8 +151,8 @@ func Test_Tcp_Broken2(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -181,8 +181,8 @@ func Test_Tcp_Broken2(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := w.Wait() - assert.Error(t, err) + errW := w.Wait() + assert.Error(t, errW) }() defer func() { @@ -206,8 +206,8 @@ func Test_Tcp_Echo2(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -244,7 +244,7 @@ func Test_Unix_Start2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(t, err) }() @@ -268,7 +268,7 @@ func Test_Unix_Failboot2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(t, err) }() @@ -295,7 +295,7 @@ func Test_Unix_Timeout2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(t, err) }() @@ -311,7 +311,7 @@ func Test_Unix_Invalid2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(t, err) }() @@ -326,8 +326,8 @@ func Test_Unix_Broken2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() - assert.NoError(t, err) + errC := ls.Close() + assert.NoError(t, errC) }() cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") @@ -351,8 +351,8 @@ func Test_Unix_Broken2(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := w.Wait() - assert.Error(t, err) + errW := w.Wait() + assert.Error(t, errW) }() defer func() { @@ -376,7 +376,7 @@ func Test_Unix_Echo2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(t, err) }() @@ -412,7 +412,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(b, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(b, err) }() @@ -439,7 +439,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(b, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(b, err) }() @@ -472,8 +472,8 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { b.Errorf("error closing the listener: error %v", err) } }() @@ -503,8 +503,8 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { b.Errorf("error closing the listener: error %v", err) } }() diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index a8dd0fe0..34fe088b 100755 --- a/pkg/transport/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -22,7 +22,7 @@ func Test_Tcp_Start(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -60,7 +60,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { f := NewSocketServer(ls, time.Minute) defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -82,7 +82,7 @@ func Test_Tcp_StartError(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -143,7 +143,7 @@ func Test_Tcp_Timeout(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -166,7 +166,7 @@ func Test_Tcp_Invalid(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -188,8 +188,8 @@ func Test_Tcp_Broken(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -218,8 +218,8 @@ func Test_Tcp_Broken(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := w.Wait() - assert.Error(t, err) + errW := w.Wait() + assert.Error(t, errW) }() defer func() { @@ -245,7 +245,7 @@ func Test_Tcp_Echo(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -284,7 +284,7 @@ func Test_Unix_Start(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -314,7 +314,7 @@ func Test_Unix_Failboot(t *testing.T) { ctx := context.Background() if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -347,7 +347,7 @@ func Test_Unix_Timeout(t *testing.T) { ctx := context.Background() if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -369,7 +369,7 @@ func Test_Unix_Invalid(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -390,8 +390,8 @@ func Test_Unix_Broken(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -422,8 +422,8 @@ func Test_Unix_Broken(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := w.Wait() - assert.Error(t, err) + errW := w.Wait() + assert.Error(t, errW) }() defer func() { @@ -448,7 +448,7 @@ func Test_Unix_Echo(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -559,7 +559,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { b.Errorf("error closing the listener: error %v", err) } @@ -588,7 +588,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { b.Errorf("error closing the listener: error %v", err) } diff --git a/pkg/worker/state.go b/pkg/worker/state.go index 176e151b..502f8199 100755 --- a/pkg/worker/state.go +++ b/pkg/worker/state.go @@ -27,6 +27,9 @@ const ( // State of worker, when no need to allocate new one StateDestroyed + // State of worker, when it reached executions limit + StateMaxJobsReached + // StateStopped - process has been terminated. StateStopped diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index ac987c14..7a1f3131 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -44,7 +44,7 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose - if errors.Is(errors.SoftJob, err) == false { + if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } @@ -91,7 +91,7 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose - if errors.Is(errors.SoftJob, err) == false { + if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 0f7ab755..b04e1363 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -46,7 +46,6 @@ type Process struct { // InitBaseWorker creates new Process over given exec.cmd. func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { - const op = errors.Op("init_base_worker") if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } diff --git a/pkg/worker_watcher/container/stack.go b/pkg/worker_watcher/container/stack.go deleted file mode 100644 index fb8ecd3b..00000000 --- a/pkg/worker_watcher/container/stack.go +++ /dev/null @@ -1,143 +0,0 @@ -package container - -import ( - "context" - "runtime" - "sync" - "time" - - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -type Stack struct { - sync.RWMutex - workers []worker.BaseProcess - destroy bool - actualNumOfWorkers uint64 - initialNumOfWorkers uint64 -} - -func NewWorkersStack(initialNumOfWorkers uint64) *Stack { - w := runtime.NumCPU() - return &Stack{ - workers: make([]worker.BaseProcess, 0, w), - actualNumOfWorkers: 0, - initialNumOfWorkers: initialNumOfWorkers, - } -} - -func (stack *Stack) Reset() { - stack.Lock() - defer stack.Unlock() - stack.actualNumOfWorkers = 0 - stack.workers = nil -} - -// Push worker back to the vec -// If vec in destroy state, Push will provide 100ms window to unlock the mutex -func (stack *Stack) Push(w worker.BaseProcess) { - stack.Lock() - defer stack.Unlock() - stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w) -} - -func (stack *Stack) IsEmpty() bool { - stack.Lock() - defer stack.Unlock() - return len(stack.workers) == 0 -} - -func (stack *Stack) Pop() (worker.BaseProcess, bool) { - stack.Lock() - defer stack.Unlock() - - // do not release new vec - if stack.destroy { - return nil, true - } - - if len(stack.workers) == 0 { - return nil, false - } - - // move worker - w := stack.workers[len(stack.workers)-1] - stack.workers = stack.workers[:len(stack.workers)-1] - stack.actualNumOfWorkers-- - return w, false -} - -func (stack *Stack) FindAndRemoveByPid(pid int64) bool { - stack.Lock() - defer stack.Unlock() - for i := 0; i < len(stack.workers); i++ { - // worker in the vec, reallocating - if stack.workers[i].Pid() == pid { - stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) - stack.actualNumOfWorkers-- - // worker found and removed - return true - } - } - // no worker with such ID - return false -} - -// Workers return copy of the workers in the vec -func (stack *Stack) Workers() []worker.BaseProcess { - stack.Lock() - defer stack.Unlock() - workersCopy := make([]worker.BaseProcess, 0, 1) - // copy - // TODO pointers, copy have no sense - for _, v := range stack.workers { - if v != nil { - workersCopy = append(workersCopy, v) - } - } - - return workersCopy -} - -func (stack *Stack) isDestroying() bool { - stack.Lock() - defer stack.Unlock() - return stack.destroy -} - -// we also have to give a chance to pool to Push worker (return it) -func (stack *Stack) Destroy(_ context.Context) { - stack.Lock() - stack.destroy = true - stack.Unlock() - - tt := time.NewTicker(time.Millisecond * 500) - defer tt.Stop() - for { - select { - case <-tt.C: - stack.Lock() - // that might be one of the workers is working - if stack.initialNumOfWorkers != stack.actualNumOfWorkers { - stack.Unlock() - continue - } - stack.Unlock() - // unnecessary mutex, but - // just to make sure. All vec at this moment are in the vec - // Pop operation is blocked, push can't be done, since it's not possible to pop - stack.Lock() - for i := 0; i < len(stack.workers); i++ { - // set state for the vec in the vec (unused at the moment) - stack.workers[i].State().Set(worker.StateDestroyed) - // kill the worker - _ = stack.workers[i].Kill() - } - stack.Unlock() - // clear - stack.Reset() - return - } - } -} diff --git a/pkg/worker_watcher/container/stack_test.go b/pkg/worker_watcher/container/stack_test.go deleted file mode 100644 index d699664c..00000000 --- a/pkg/worker_watcher/container/stack_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package container - -import ( - "context" - "os/exec" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" -) - -func TestNewWorkersStack(t *testing.T) { - stack := NewWorkersStack(0) - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) - assert.Equal(t, []worker.BaseProcess{}, stack.workers) -} - -func TestStack_Push(t *testing.T) { - stack := NewWorkersStack(1) - - w, err := worker.InitBaseWorker(&exec.Cmd{}) - assert.NoError(t, err) - - sw := worker.From(w) - - stack.Push(sw) - assert.Equal(t, uint64(1), stack.actualNumOfWorkers) -} - -func TestStack_Pop(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - - sw := worker.From(w) - - stack.Push(sw) - assert.Equal(t, uint64(1), stack.actualNumOfWorkers) - - _, _ = stack.Pop() - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) -} - -func TestStack_FindAndRemoveByPid(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - - assert.NoError(t, w.Start()) - - sw := worker.From(w) - - stack.Push(sw) - assert.Equal(t, uint64(1), stack.actualNumOfWorkers) - - stack.FindAndRemoveByPid(w.Pid()) - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) -} - -func TestStack_IsEmpty(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - - sw := worker.From(w) - stack.Push(sw) - - assert.Equal(t, uint64(1), stack.actualNumOfWorkers) - - assert.Equal(t, false, stack.IsEmpty()) -} - -func TestStack_Workers(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - assert.NoError(t, w.Start()) - - sw := worker.From(w) - stack.Push(sw) - - wrks := stack.Workers() - assert.Equal(t, 1, len(wrks)) - assert.Equal(t, w.Pid(), wrks[0].Pid()) -} - -func TestStack_Reset(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - assert.NoError(t, w.Start()) - - sw := worker.From(w) - stack.Push(sw) - - assert.Equal(t, uint64(1), stack.actualNumOfWorkers) - stack.Reset() - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) -} - -func TestStack_Destroy(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - assert.NoError(t, w.Start()) - - sw := worker.From(w) - stack.Push(sw) - - stack.Destroy(context.Background()) - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) -} - -func TestStack_DestroyWithWait(t *testing.T) { - stack := NewWorkersStack(2) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - assert.NoError(t, w.Start()) - - sw := worker.From(w) - stack.Push(sw) - stack.Push(sw) - assert.Equal(t, uint64(2), stack.actualNumOfWorkers) - - go func() { - wrk, _ := stack.Pop() - time.Sleep(time.Second * 3) - stack.Push(wrk) - }() - time.Sleep(time.Second) - stack.Destroy(context.Background()) - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) -} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index cc8cc2b6..a6dfe43e 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -79,47 +79,44 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { } // ========================================================= // SLOW PATH - _ = w.Kill() + _ = w.Kill() // how the worker get here??????? // no free workers in the container // try to continuously get free one for { - select { - default: - w, stop = ww.container.Dequeue() - if stop { - c <- get{ - nil, - errors.E(op, errors.WatcherStopped), - } + w, stop = ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), } + } - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - c <- get{ - w, - nil, - } - return - case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateStopping: - // worker doing no work because it in the container - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + c <- get{ + w, + nil, } + return + case worker.StateWorking: // how?? + ww.container.Enqueue(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue } } }() @@ -177,6 +174,10 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { // O(1) operation func (ww *workerWatcher) Push(w worker.BaseProcess) { + if w.State().Value() != worker.StateReady { + _ = w.Kill() + return + } ww.container.Enqueue(w) } @@ -190,7 +191,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { tt := time.NewTicker(time.Millisecond * 100) defer tt.Stop() - for { + for { //nolint:gosimple select { case <-tt.C: ww.Lock() |