diff options
author | Valery Piashchynski <[email protected]> | 2021-02-05 01:07:26 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-05 01:07:26 +0300 |
commit | 5e012c6f2c822858b63638325804524250992a42 (patch) | |
tree | 6832f8c5079c098d001792071b03d5ca23f22374 | |
parent | d629f08408a4478aaba90079a4e37ab69cfc12ef (diff) |
handle worker state before sending to the exec
-rwxr-xr-x | pkg/pool/static_pool.go | 11 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 2 | ||||
-rwxr-xr-x | pkg/worker/state.go | 9 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 11 | ||||
-rw-r--r-- | pkg/worker_watcher/stack_test.go | 2 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 53 |
6 files changed, 50 insertions, 38 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 72c3d4df..23f24e27 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -45,8 +45,8 @@ type StaticPool struct { // allocate new worker allocator worker.Allocator - // errEncoder is the default Exec error encoder - errEncoder ErrorEncoder + // err_encoder is the default Exec error encoder + err_encoder ErrorEncoder //nolint } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -88,7 +88,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg return nil, errors.E(op, err) } - p.errEncoder = defaultErrEncoder(p) + p.err_encoder = defaultErrEncoder(p) // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { @@ -144,14 +144,13 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { rsp, err := w.Exec(p) if err != nil { - return sp.errEncoder(err, w) + return sp.err_encoder(err, w) } // worker want's to be terminated // TODO careful with string(rsp.Context) if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { sp.stopWorker(w) - return sp.Exec(p) } @@ -175,7 +174,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p rsp, err := w.ExecWithTimeout(ctx, p) if err != nil { - return sp.errEncoder(err, w) + return sp.err_encoder(err, w) } // worker want's to be terminated diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 30a4ebaf..8b1bf6a9 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -466,7 +466,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { } _, err = p.Exec(payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) + assert.NoError(t, err) p.Destroy(ctx) } diff --git a/pkg/worker/state.go b/pkg/worker/state.go index 54f76c09..c5d70a21 100755 --- a/pkg/worker/state.go +++ b/pkg/worker/state.go @@ -4,6 +4,7 @@ import ( "sync/atomic" ) +// SYNC WITH worker_watcher.GET const ( // StateInactive - no associated process StateInactive int64 = iota @@ -59,10 +60,18 @@ func (s *StateImpl) String() string { return "working" case StateInvalid: return "invalid" + case StateStopping: + return "stopping" case StateStopped: return "stopped" + case StateKilling: + return "killing" case StateErrored: return "errored" + case StateDestroyed: + return "destroyed" + case StateRemove: + return "remove" } return "undefined" diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index f7e8008f..c1e56f3a 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -15,15 +15,6 @@ import ( "go.uber.org/multierr" ) -const ( - // WaitDuration - for how long error buffer should attempt to aggregate error messages - // before merging output together since lastError update (required to keep error update together). - WaitDuration = 25 * time.Millisecond - - // ReadBufSize used to make a slice with specified length to read from stderr - ReadBufSize = 10240 // Kb -) - type Options func(p *Process) // Process - supervised process with api over goridge.Relay. @@ -201,7 +192,7 @@ func (w *Process) Stop() error { err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) if err != nil { w.state.Set(StateKilling) - return multierr.Append(err, w.cmd.Process.Kill()) + return multierr.Append(err, w.cmd.Process.Signal(os.Kill)) } w.state.Set(StateStopped) return nil diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go index 5287a6dc..5fc45adc 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/stack_test.go @@ -140,3 +140,5 @@ func TestStack_DestroyWithWait(t *testing.T) { stack.Destroy(context.Background()) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) } + + diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 6b9e9dbf..93db7317 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -48,26 +48,37 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { // handle worker remove state // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == worker.StateRemove { - err := ww.Remove(w) - if err != nil { - return nil, err - } - // try to get next - return ww.Get(ctx) - } - - // if worker not in the ready state it possibly corrupted - if w != nil && w.State().Value() != worker.StateReady { - err := ww.Remove(w) - if err != nil { - return nil, err + if w != nil { + switch w.State().Value() { + case worker.StateRemove: + err := ww.Remove(w) + if err != nil { + return nil, err + } + // try to get next + return ww.Get(ctx) + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateWorking, // ??? how + worker.StateStopping: + // worker doing no work because it in the stack + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // and recursively try to get the next worker + return ww.Get(ctx) + // return only workers in the Ready state + case worker.StateReady: + return w, nil } - // try to get next - return ww.Get(ctx) } - // no free stack + // no free workers in the stack if w == nil { for { select { @@ -104,15 +115,15 @@ func (ww *workerWatcher) Allocate() error { return nil } +// Remove func (ww *workerWatcher) Remove(wb worker.SyncWorker) error { ww.mutex.Lock() defer ww.mutex.Unlock() const op = errors.Op("worker_watcher_remove_worker") - pid := wb.Pid() - - if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(worker.StateRemove) + // set remove state + wb.State().Set(worker.StateRemove) + if ww.stack.FindAndRemoveByPid(wb.Pid()) { err := wb.Kill() if err != nil { return errors.E(op, err) |