diff options
author | Valery Piashchynski <[email protected]> | 2021-12-23 21:07:45 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-12-23 21:07:45 +0300 |
commit | 9cbb6be27ca0bd56eaa6db9a875830a8ce6110e8 (patch) | |
tree | 2ca6f5623ace05f083ef96c3b890c7bb00a7d7ee | |
parent | 31112495808ae37f38f7b514de1f40b8b8a75238 (diff) | |
parent | 671fe2c81c4d1962e96b074f7ddead8dd07a0ea5 (diff) |
[#879]: feat(pool): add `Reset` methodv2.6.3
-rw-r--r-- | CHANGELOG.md | 8 | ||||
-rw-r--r-- | pool/interface.go | 6 | ||||
-rwxr-xr-x | pool/static_pool.go | 130 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 32 | ||||
-rwxr-xr-x | pool/supervisor_pool.go | 6 | ||||
-rw-r--r-- | pool/supervisor_test.go | 27 | ||||
-rwxr-xr-x | worker/worker.go | 1 | ||||
-rw-r--r-- | worker_watcher/container/channel/vec.go | 17 | ||||
-rwxr-xr-x | worker_watcher/worker_watcher.go | 61 |
9 files changed, 219 insertions, 69 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 702099af..a15c2d2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # CHANGELOG +# v2.6.3 (23.12.2021) + +## 👀 New: + +- ✏️ Add `Reset` method to the static pool interface. Should be used to reset the pool instead of destroying and replacing it. + +--- + # v2.6.2 (15.12.2021) ## 🩹 Fixes: diff --git a/pool/interface.go b/pool/interface.go index d089092f..6a150188 100644 --- a/pool/interface.go +++ b/pool/interface.go @@ -21,6 +21,9 @@ type Pool interface { // RemoveWorker removes worker from the pool. RemoveWorker(worker worker.BaseProcess) error + // Reset kill all workers inside the watcher and replaces with new + Reset(ctx context.Context) error + // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) @@ -45,6 +48,9 @@ type Watcher interface { // Destroy destroys the underlying container Destroy(ctx context.Context) + // Reset will replace container and workers array, kill all workers + Reset(ctx context.Context) + // List return all container w/o removing it from internal storage List() []worker.BaseProcess diff --git a/pool/static_pool.go b/pool/static_pool.go index 4906788f..7481f84f 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -156,6 +156,79 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { return rsp, nil } +// Destroy all underlying stack (but let them complete the task). +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.events.Unsubscribe(sp.eventsID) + sp.ww.Destroy(ctx) +} + +func (sp *StaticPool) Reset(ctx context.Context) error { + // destroy all workers + sp.ww.Reset(ctx) + workers, err := sp.allocateWorkers(sp.cfg.NumWorkers) + if err != nil { + return err + } + // add the NEW workers to the watcher + err = sp.ww.Watch(workers) + if err != nil { + return err + } + + return nil +} + +func defaultErrEncoder(sp *StaticPool) ErrorEncoder { + return func(err error, w worker.BaseProcess) (*payload.Payload, error) { + // just push event if on any stage was timeout error + switch { + case errors.Is(errors.ExecTTL, err): + sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err))) + w.State().Set(worker.StateInvalid) + return nil, err + + case errors.Is(errors.SoftJob, err): + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + + // if max jobs exceed + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // mark old as invalid and stop + w.State().Set(worker.StateInvalid) + errS := w.Stop() + if errS != nil { + return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, err + } + + // soft jobs errors are allowed, just put the worker back + sp.ww.Release(w) + + return nil, err + case errors.Is(errors.Network, err): + // in case of network error, we can't stop the worker, we should kill it + w.State().Set(worker.StateInvalid) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + + // kill the worker instead of sending net packet to it + _ = w.Kill() + + return nil, err + default: + w.State().Set(worker.StateInvalid) + sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + // stop the worker, worker here might be in the broken state (network) + errS := w.Stop() + if errS != nil { + return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, err + } + } +} + // Be careful, sync with pool.Exec method func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") @@ -225,63 +298,6 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work return w, nil } -// Destroy all underlying stack (but let them complete the task). -func (sp *StaticPool) Destroy(ctx context.Context) { - sp.events.Unsubscribe(sp.eventsID) - sp.ww.Destroy(ctx) -} - -func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (*payload.Payload, error) { - // just push event if on any stage was timeout error - switch { - case errors.Is(errors.ExecTTL, err): - sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err))) - w.State().Set(worker.StateInvalid) - return nil, err - - case errors.Is(errors.SoftJob, err): - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - - // if max jobs exceed - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - // mark old as invalid and stop - w.State().Set(worker.StateInvalid) - errS := w.Stop() - if errS != nil { - return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) - } - - return nil, err - } - - // soft jobs errors are allowed, just put the worker back - sp.ww.Release(w) - - return nil, err - case errors.Is(errors.Network, err): - // in case of network error, we can't stop the worker, we should kill it - w.State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - - // kill the worker instead of sending net packet to it - _ = w.Kill() - - return nil, err - default: - w.State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - // stop the worker, worker here might be in the broken state (network) - errS := w.Stop() - if errS != nil { - return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS)) - } - - return nil, err - } - } -} - func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 8e2667ac..a45aa29d 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -42,6 +42,34 @@ func Test_NewPool(t *testing.T) { assert.NotNil(t, p) } +func Test_NewPoolReset(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + w := p.Workers() + if len(w) == 0 { + t.Fatal("should be workers inside") + } + + pid := w[0].Pid() + require.NoError(t, p.Reset(context.Background())) + + w2 := p.Workers() + if len(w2) == 0 { + t.Fatal("should be workers inside") + } + + require.NotEqual(t, pid, w2[0].Pid()) + p.Destroy(ctx) +} + func Test_StaticPool_Invalid(t *testing.T) { p, err := Initialize( context.Background(), @@ -67,6 +95,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) + p.Destroy(context.Background()) } func Test_StaticPool_Echo(t *testing.T) { @@ -562,7 +591,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { func Test_CRC_WithPayload(t *testing.T) { ctx := context.Background() - _, err := Initialize( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") }, pipe.NewPipeFactory(), @@ -571,6 +600,7 @@ func Test_CRC_WithPayload(t *testing.T) { assert.Error(t, err) data := err.Error() assert.Contains(t, data, "warning: some weird php erro") + require.Nil(t, p) } /* PTR: diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 5f87c8a4..0502dc9a 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -51,6 +51,12 @@ func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*paylo panic("used to satisfy pool interface") } +func (sp *supervised) Reset(ctx context.Context) error { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Reset(ctx) +} + func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index 98af918a..6e8ab552 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -58,6 +58,33 @@ func TestSupervisedPool_Exec(t *testing.T) { p.Destroy(context.Background()) } +func Test_SupervisedPoolReset(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfgSupervised, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + w := p.Workers() + if len(w) == 0 { + t.Fatal("should be workers inside") + } + + pid := w[0].Pid() + require.NoError(t, p.Reset(context.Background())) + + w2 := p.Workers() + if len(w2) == 0 { + t.Fatal("should be workers inside") + } + + require.NotEqual(t, pid, w2[0].Pid()) +} + // This test should finish without freezes func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { var cfgSupervised = cfgSupervised diff --git a/worker/worker.go b/worker/worker.go index 564d83c4..b2689c59 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -223,7 +223,6 @@ func (w *Process) Kill() error { return err } w.state.Set(StateStopped) - w.events.Unsubscribe(w.eventsID) return nil } diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index c0c27575..65c2066e 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -98,15 +98,16 @@ func (v *Vec) Push(w worker.BaseProcess) { func (v *Vec) Remove(_ int64) {} func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { - /* - if *addr == old { - *addr = new - return true - } - */ - if atomic.LoadUint64(&v.destroy) == 1 { - return nil, errors.E(errors.WatcherStopped) + // drain channel + for { + select { + case <-v.workers: + continue + default: + return nil, errors.E(errors.WatcherStopped) + } + } } // used only for the TTL-ed workers diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index e59d9feb..cfde9931 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -82,7 +82,6 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { // Take is not a thread safe operation func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") - // thread safe operation w, err := ww.container.Pop(ctx) if err != nil { @@ -222,6 +221,59 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) { } } +func (ww *workerWatcher) Reset(ctx context.Context) { + // destroy container, we don't use ww mutex here, since we should be able to push worker + ww.Lock() + // do not release new workers + ww.container.Destroy() + ww.Unlock() + + tt := time.NewTicker(time.Millisecond * 10) + defer tt.Stop() + for { + select { + case <-tt.C: + ww.RLock() + // that might be one of the workers is working + if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { + ww.RUnlock() + continue + } + ww.RUnlock() + // All container at this moment are in the container + // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.Lock() + + // drain channel + _, _ = ww.container.Pop(ctx) + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + + ww.workers = make([]worker.BaseProcess, 0, atomic.LoadUint64(ww.numWorkers)) + ww.container = channel.NewVector(atomic.LoadUint64(ww.numWorkers)) + ww.Unlock() + return + case <-ctx.Done(): + // drain channel + _, _ = ww.container.Pop(ctx) + // kill workers + ww.Lock() + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + + ww.workers = make([]worker.BaseProcess, 0, atomic.LoadUint64(ww.numWorkers)) + ww.container = channel.NewVector(atomic.LoadUint64(ww.numWorkers)) + ww.Unlock() + } + } +} + // Destroy all underlying container (but let them complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { // destroy container, we don't use ww mutex here, since we should be able to push worker @@ -231,7 +283,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { ww.Unlock() ww.events.Unsubscribe(ww.eventsID) - tt := time.NewTicker(time.Millisecond * 100) + tt := time.NewTicker(time.Millisecond * 10) defer tt.Stop() for { select { @@ -246,6 +298,8 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { // All container at this moment are in the container // Pop operation is blocked, push can't be done, since it's not possible to pop ww.Lock() + // drain channel + _, _ = ww.container.Pop(ctx) for i := 0; i < len(ww.workers); i++ { ww.workers[i].State().Set(worker.StateDestroyed) // kill the worker @@ -254,10 +308,13 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { ww.Unlock() return case <-ctx.Done(): + // drain channel + _, _ = ww.container.Pop(ctx) // kill workers ww.Lock() for i := 0; i < len(ww.workers); i++ { ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker _ = ww.workers[i].Kill() } ww.Unlock() |