summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-05 01:07:26 +0300
committerValery Piashchynski <[email protected]>2021-02-05 01:07:26 +0300
commit5e012c6f2c822858b63638325804524250992a42 (patch)
tree6832f8c5079c098d001792071b03d5ca23f22374
parentd629f08408a4478aaba90079a4e37ab69cfc12ef (diff)
handle worker state before sending to the exec
-rwxr-xr-xpkg/pool/static_pool.go11
-rwxr-xr-xpkg/pool/static_pool_test.go2
-rwxr-xr-xpkg/worker/state.go9
-rwxr-xr-xpkg/worker/worker.go11
-rw-r--r--pkg/worker_watcher/stack_test.go2
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go53
6 files changed, 50 insertions, 38 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 72c3d4df..23f24e27 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -45,8 +45,8 @@ type StaticPool struct {
// allocate new worker
allocator worker.Allocator
- // errEncoder is the default Exec error encoder
- errEncoder ErrorEncoder
+ // err_encoder is the default Exec error encoder
+ err_encoder ErrorEncoder //nolint
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -88,7 +88,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
return nil, errors.E(op, err)
}
- p.errEncoder = defaultErrEncoder(p)
+ p.err_encoder = defaultErrEncoder(p)
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
@@ -144,14 +144,13 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
rsp, err := w.Exec(p)
if err != nil {
- return sp.errEncoder(err, w)
+ return sp.err_encoder(err, w)
}
// worker want's to be terminated
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
sp.stopWorker(w)
-
return sp.Exec(p)
}
@@ -175,7 +174,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
rsp, err := w.ExecWithTimeout(ctx, p)
if err != nil {
- return sp.errEncoder(err, w)
+ return sp.err_encoder(err, w)
}
// worker want's to be terminated
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 30a4ebaf..8b1bf6a9 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -466,7 +466,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
}
_, err = p.Exec(payload.Payload{Body: []byte("hello")})
- assert.Error(t, err)
+ assert.NoError(t, err)
p.Destroy(ctx)
}
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
index 54f76c09..c5d70a21 100755
--- a/pkg/worker/state.go
+++ b/pkg/worker/state.go
@@ -4,6 +4,7 @@ import (
"sync/atomic"
)
+// SYNC WITH worker_watcher.GET
const (
// StateInactive - no associated process
StateInactive int64 = iota
@@ -59,10 +60,18 @@ func (s *StateImpl) String() string {
return "working"
case StateInvalid:
return "invalid"
+ case StateStopping:
+ return "stopping"
case StateStopped:
return "stopped"
+ case StateKilling:
+ return "killing"
case StateErrored:
return "errored"
+ case StateDestroyed:
+ return "destroyed"
+ case StateRemove:
+ return "remove"
}
return "undefined"
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index f7e8008f..c1e56f3a 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -15,15 +15,6 @@ import (
"go.uber.org/multierr"
)
-const (
- // WaitDuration - for how long error buffer should attempt to aggregate error messages
- // before merging output together since lastError update (required to keep error update together).
- WaitDuration = 25 * time.Millisecond
-
- // ReadBufSize used to make a slice with specified length to read from stderr
- ReadBufSize = 10240 // Kb
-)
-
type Options func(p *Process)
// Process - supervised process with api over goridge.Relay.
@@ -201,7 +192,7 @@ func (w *Process) Stop() error {
err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
if err != nil {
w.state.Set(StateKilling)
- return multierr.Append(err, w.cmd.Process.Kill())
+ return multierr.Append(err, w.cmd.Process.Signal(os.Kill))
}
w.state.Set(StateStopped)
return nil
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go
index 5287a6dc..5fc45adc 100644
--- a/pkg/worker_watcher/stack_test.go
+++ b/pkg/worker_watcher/stack_test.go
@@ -140,3 +140,5 @@ func TestStack_DestroyWithWait(t *testing.T) {
stack.Destroy(context.Background())
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
}
+
+
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 6b9e9dbf..93db7317 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -48,26 +48,37 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
// handle worker remove state
// in this state worker is destroyed by supervisor
- if w != nil && w.State().Value() == worker.StateRemove {
- err := ww.Remove(w)
- if err != nil {
- return nil, err
- }
- // try to get next
- return ww.Get(ctx)
- }
-
- // if worker not in the ready state it possibly corrupted
- if w != nil && w.State().Value() != worker.StateReady {
- err := ww.Remove(w)
- if err != nil {
- return nil, err
+ if w != nil {
+ switch w.State().Value() {
+ case worker.StateRemove:
+ err := ww.Remove(w)
+ if err != nil {
+ return nil, err
+ }
+ // try to get next
+ return ww.Get(ctx)
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateWorking, // ??? how
+ worker.StateStopping:
+ // worker doing no work because it in the stack
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // and recursively try to get the next worker
+ return ww.Get(ctx)
+ // return only workers in the Ready state
+ case worker.StateReady:
+ return w, nil
}
- // try to get next
- return ww.Get(ctx)
}
- // no free stack
+ // no free workers in the stack
if w == nil {
for {
select {
@@ -104,15 +115,15 @@ func (ww *workerWatcher) Allocate() error {
return nil
}
+// Remove
func (ww *workerWatcher) Remove(wb worker.SyncWorker) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
const op = errors.Op("worker_watcher_remove_worker")
- pid := wb.Pid()
-
- if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(worker.StateRemove)
+ // set remove state
+ wb.State().Set(worker.StateRemove)
+ if ww.stack.FindAndRemoveByPid(wb.Pid()) {
err := wb.Kill()
if err != nil {
return errors.E(op, err)