summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-12-23 21:07:45 +0300
committerGitHub <[email protected]>2021-12-23 21:07:45 +0300
commit9cbb6be27ca0bd56eaa6db9a875830a8ce6110e8 (patch)
tree2ca6f5623ace05f083ef96c3b890c7bb00a7d7ee
parent31112495808ae37f38f7b514de1f40b8b8a75238 (diff)
parent671fe2c81c4d1962e96b074f7ddead8dd07a0ea5 (diff)
[#879]: feat(pool): add `Reset` methodv2.6.3
-rw-r--r--CHANGELOG.md8
-rw-r--r--pool/interface.go6
-rwxr-xr-xpool/static_pool.go130
-rwxr-xr-xpool/static_pool_test.go32
-rwxr-xr-xpool/supervisor_pool.go6
-rw-r--r--pool/supervisor_test.go27
-rwxr-xr-xworker/worker.go1
-rw-r--r--worker_watcher/container/channel/vec.go17
-rwxr-xr-xworker_watcher/worker_watcher.go61
9 files changed, 219 insertions, 69 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 702099af..a15c2d2e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,13 @@
# CHANGELOG
+# v2.6.3 (23.12.2021)
+
+## 👀 New:
+
+- ✏️ Add `Reset` method to the static pool interface. Should be used to reset the pool instead of destroying and replacing it.
+
+---
+
# v2.6.2 (15.12.2021)
## 🩹 Fixes:
diff --git a/pool/interface.go b/pool/interface.go
index d089092f..6a150188 100644
--- a/pool/interface.go
+++ b/pool/interface.go
@@ -21,6 +21,9 @@ type Pool interface {
// RemoveWorker removes worker from the pool.
RemoveWorker(worker worker.BaseProcess) error
+ // Reset kill all workers inside the watcher and replaces with new
+ Reset(ctx context.Context) error
+
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
@@ -45,6 +48,9 @@ type Watcher interface {
// Destroy destroys the underlying container
Destroy(ctx context.Context)
+ // Reset will replace container and workers array, kill all workers
+ Reset(ctx context.Context)
+
// List return all container w/o removing it from internal storage
List() []worker.BaseProcess
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 4906788f..7481f84f 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -156,6 +156,79 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
return rsp, nil
}
+// Destroy all underlying stack (but let them complete the task).
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.events.Unsubscribe(sp.eventsID)
+ sp.ww.Destroy(ctx)
+}
+
+func (sp *StaticPool) Reset(ctx context.Context) error {
+ // destroy all workers
+ sp.ww.Reset(ctx)
+ workers, err := sp.allocateWorkers(sp.cfg.NumWorkers)
+ if err != nil {
+ return err
+ }
+ // add the NEW workers to the watcher
+ err = sp.ww.Watch(workers)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
+ return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
+ // just push event if on any stage was timeout error
+ switch {
+ case errors.Is(errors.ExecTTL, err):
+ sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err)))
+ w.State().Set(worker.StateInvalid)
+ return nil, err
+
+ case errors.Is(errors.SoftJob, err):
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+
+ // if max jobs exceed
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // mark old as invalid and stop
+ w.State().Set(worker.StateInvalid)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, err
+ }
+
+ // soft jobs errors are allowed, just put the worker back
+ sp.ww.Release(w)
+
+ return nil, err
+ case errors.Is(errors.Network, err):
+ // in case of network error, we can't stop the worker, we should kill it
+ w.State().Set(worker.StateInvalid)
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+
+ // kill the worker instead of sending net packet to it
+ _ = w.Kill()
+
+ return nil, err
+ default:
+ w.State().Set(worker.StateInvalid)
+ sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+ // stop the worker, worker here might be in the broken state (network)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, err
+ }
+ }
+}
+
// Be careful, sync with pool.Exec method
func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
@@ -225,63 +298,6 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
return w, nil
}
-// Destroy all underlying stack (but let them complete the task).
-func (sp *StaticPool) Destroy(ctx context.Context) {
- sp.events.Unsubscribe(sp.eventsID)
- sp.ww.Destroy(ctx)
-}
-
-func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
- // just push event if on any stage was timeout error
- switch {
- case errors.Is(errors.ExecTTL, err):
- sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err)))
- w.State().Set(worker.StateInvalid)
- return nil, err
-
- case errors.Is(errors.SoftJob, err):
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
- // if max jobs exceed
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- // mark old as invalid and stop
- w.State().Set(worker.StateInvalid)
- errS := w.Stop()
- if errS != nil {
- return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
- }
-
- return nil, err
- }
-
- // soft jobs errors are allowed, just put the worker back
- sp.ww.Release(w)
-
- return nil, err
- case errors.Is(errors.Network, err):
- // in case of network error, we can't stop the worker, we should kill it
- w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
- // kill the worker instead of sending net packet to it
- _ = w.Kill()
-
- return nil, err
- default:
- w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
- // stop the worker, worker here might be in the broken state (network)
- errS := w.Stop()
- if errS != nil {
- return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS))
- }
-
- return nil, err
- }
- }
-}
-
func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.SyncWorker, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index 8e2667ac..a45aa29d 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -42,6 +42,34 @@ func Test_NewPool(t *testing.T) {
assert.NotNil(t, p)
}
+func Test_NewPoolReset(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ w := p.Workers()
+ if len(w) == 0 {
+ t.Fatal("should be workers inside")
+ }
+
+ pid := w[0].Pid()
+ require.NoError(t, p.Reset(context.Background()))
+
+ w2 := p.Workers()
+ if len(w2) == 0 {
+ t.Fatal("should be workers inside")
+ }
+
+ require.NotEqual(t, pid, w2[0].Pid())
+ p.Destroy(ctx)
+}
+
func Test_StaticPool_Invalid(t *testing.T) {
p, err := Initialize(
context.Background(),
@@ -67,6 +95,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
assert.NotNil(t, p)
assert.NoError(t, err)
+ p.Destroy(context.Background())
}
func Test_StaticPool_Echo(t *testing.T) {
@@ -562,7 +591,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
func Test_CRC_WithPayload(t *testing.T) {
ctx := context.Background()
- _, err := Initialize(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") },
pipe.NewPipeFactory(),
@@ -571,6 +600,7 @@ func Test_CRC_WithPayload(t *testing.T) {
assert.Error(t, err)
data := err.Error()
assert.Contains(t, data, "warning: some weird php erro")
+ require.Nil(t, p)
}
/* PTR:
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 5f87c8a4..0502dc9a 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -51,6 +51,12 @@ func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*paylo
panic("used to satisfy pool interface")
}
+func (sp *supervised) Reset(ctx context.Context) error {
+ sp.mu.Lock()
+ defer sp.mu.Unlock()
+ return sp.pool.Reset(ctx)
+}
+
func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index 98af918a..6e8ab552 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -58,6 +58,33 @@ func TestSupervisedPool_Exec(t *testing.T) {
p.Destroy(context.Background())
}
+func Test_SupervisedPoolReset(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgSupervised,
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ w := p.Workers()
+ if len(w) == 0 {
+ t.Fatal("should be workers inside")
+ }
+
+ pid := w[0].Pid()
+ require.NoError(t, p.Reset(context.Background()))
+
+ w2 := p.Workers()
+ if len(w2) == 0 {
+ t.Fatal("should be workers inside")
+ }
+
+ require.NotEqual(t, pid, w2[0].Pid())
+}
+
// This test should finish without freezes
func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
var cfgSupervised = cfgSupervised
diff --git a/worker/worker.go b/worker/worker.go
index 564d83c4..b2689c59 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -223,7 +223,6 @@ func (w *Process) Kill() error {
return err
}
w.state.Set(StateStopped)
-
w.events.Unsubscribe(w.eventsID)
return nil
}
diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go
index c0c27575..65c2066e 100644
--- a/worker_watcher/container/channel/vec.go
+++ b/worker_watcher/container/channel/vec.go
@@ -98,15 +98,16 @@ func (v *Vec) Push(w worker.BaseProcess) {
func (v *Vec) Remove(_ int64) {}
func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
- /*
- if *addr == old {
- *addr = new
- return true
- }
- */
-
if atomic.LoadUint64(&v.destroy) == 1 {
- return nil, errors.E(errors.WatcherStopped)
+ // drain channel
+ for {
+ select {
+ case <-v.workers:
+ continue
+ default:
+ return nil, errors.E(errors.WatcherStopped)
+ }
+ }
}
// used only for the TTL-ed workers
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index e59d9feb..cfde9931 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -82,7 +82,6 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
// Take is not a thread safe operation
func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
-
// thread safe operation
w, err := ww.container.Pop(ctx)
if err != nil {
@@ -222,6 +221,59 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) {
}
}
+func (ww *workerWatcher) Reset(ctx context.Context) {
+ // destroy container, we don't use ww mutex here, since we should be able to push worker
+ ww.Lock()
+ // do not release new workers
+ ww.container.Destroy()
+ ww.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 10)
+ defer tt.Stop()
+ for {
+ select {
+ case <-tt.C:
+ ww.RLock()
+ // that might be one of the workers is working
+ if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
+ ww.RUnlock()
+ continue
+ }
+ ww.RUnlock()
+ // All container at this moment are in the container
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ ww.Lock()
+
+ // drain channel
+ _, _ = ww.container.Pop(ctx)
+ for i := 0; i < len(ww.workers); i++ {
+ ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
+ _ = ww.workers[i].Kill()
+ }
+
+ ww.workers = make([]worker.BaseProcess, 0, atomic.LoadUint64(ww.numWorkers))
+ ww.container = channel.NewVector(atomic.LoadUint64(ww.numWorkers))
+ ww.Unlock()
+ return
+ case <-ctx.Done():
+ // drain channel
+ _, _ = ww.container.Pop(ctx)
+ // kill workers
+ ww.Lock()
+ for i := 0; i < len(ww.workers); i++ {
+ ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
+ _ = ww.workers[i].Kill()
+ }
+
+ ww.workers = make([]worker.BaseProcess, 0, atomic.LoadUint64(ww.numWorkers))
+ ww.container = channel.NewVector(atomic.LoadUint64(ww.numWorkers))
+ ww.Unlock()
+ }
+ }
+}
+
// Destroy all underlying container (but let them complete the task)
func (ww *workerWatcher) Destroy(ctx context.Context) {
// destroy container, we don't use ww mutex here, since we should be able to push worker
@@ -231,7 +283,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
ww.Unlock()
ww.events.Unsubscribe(ww.eventsID)
- tt := time.NewTicker(time.Millisecond * 100)
+ tt := time.NewTicker(time.Millisecond * 10)
defer tt.Stop()
for {
select {
@@ -246,6 +298,8 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
// All container at this moment are in the container
// Pop operation is blocked, push can't be done, since it's not possible to pop
ww.Lock()
+ // drain channel
+ _, _ = ww.container.Pop(ctx)
for i := 0; i < len(ww.workers); i++ {
ww.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
@@ -254,10 +308,13 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
ww.Unlock()
return
case <-ctx.Done():
+ // drain channel
+ _, _ = ww.container.Pop(ctx)
// kill workers
ww.Lock()
for i := 0; i < len(ww.workers); i++ {
ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
_ = ww.workers[i].Kill()
}
ww.Unlock()