diff options
Diffstat (limited to 'pool')
-rw-r--r-- | pool/interface.go | 6 | ||||
-rwxr-xr-x | pool/static_pool.go | 130 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 32 | ||||
-rwxr-xr-x | pool/supervisor_pool.go | 6 | ||||
-rw-r--r-- | pool/supervisor_test.go | 27 |
5 files changed, 143 insertions, 58 deletions
diff --git a/pool/interface.go b/pool/interface.go index d089092f..6a150188 100644 --- a/pool/interface.go +++ b/pool/interface.go @@ -21,6 +21,9 @@ type Pool interface { // RemoveWorker removes worker from the pool. RemoveWorker(worker worker.BaseProcess) error + // Reset kill all workers inside the watcher and replaces with new + Reset(ctx context.Context) error + // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) @@ -45,6 +48,9 @@ type Watcher interface { // Destroy destroys the underlying container Destroy(ctx context.Context) + // Reset will replace container and workers array, kill all workers + Reset(ctx context.Context) + // List return all container w/o removing it from internal storage List() []worker.BaseProcess diff --git a/pool/static_pool.go b/pool/static_pool.go index 4906788f..7481f84f 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -156,6 +156,79 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { return rsp, nil } +// Destroy all underlying stack (but let them complete the task). +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.events.Unsubscribe(sp.eventsID) + sp.ww.Destroy(ctx) +} + +func (sp *StaticPool) Reset(ctx context.Context) error { + // destroy all workers + sp.ww.Reset(ctx) + workers, err := sp.allocateWorkers(sp.cfg.NumWorkers) + if err != nil { + return err + } + // add the NEW workers to the watcher + err = sp.ww.Watch(workers) + if err != nil { + return err + } + + return nil +} + +func defaultErrEncoder(sp *StaticPool) ErrorEncoder { + return func(err error, w worker.BaseProcess) (*payload.Payload, error) { + // just push event if on any stage was timeout error + switch { + case errors.Is(errors.ExecTTL, err): + sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err))) + w.State().Set(worker.StateInvalid) + return nil, err + + case errors.Is(errors.SoftJob, err): + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + + // if max jobs exceed + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // mark old as invalid and stop + w.State().Set(worker.StateInvalid) + errS := w.Stop() + if errS != nil { + return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, err + } + + // soft jobs errors are allowed, just put the worker back + sp.ww.Release(w) + + return nil, err + case errors.Is(errors.Network, err): + // in case of network error, we can't stop the worker, we should kill it + w.State().Set(worker.StateInvalid) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + + // kill the worker instead of sending net packet to it + _ = w.Kill() + + return nil, err + default: + w.State().Set(worker.StateInvalid) + sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + // stop the worker, worker here might be in the broken state (network) + errS := w.Stop() + if errS != nil { + return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, err + } + } +} + // Be careful, sync with pool.Exec method func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") @@ -225,63 +298,6 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work return w, nil } -// Destroy all underlying stack (but let them complete the task). -func (sp *StaticPool) Destroy(ctx context.Context) { - sp.events.Unsubscribe(sp.eventsID) - sp.ww.Destroy(ctx) -} - -func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (*payload.Payload, error) { - // just push event if on any stage was timeout error - switch { - case errors.Is(errors.ExecTTL, err): - sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err))) - w.State().Set(worker.StateInvalid) - return nil, err - - case errors.Is(errors.SoftJob, err): - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - - // if max jobs exceed - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - // mark old as invalid and stop - w.State().Set(worker.StateInvalid) - errS := w.Stop() - if errS != nil { - return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) - } - - return nil, err - } - - // soft jobs errors are allowed, just put the worker back - sp.ww.Release(w) - - return nil, err - case errors.Is(errors.Network, err): - // in case of network error, we can't stop the worker, we should kill it - w.State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - - // kill the worker instead of sending net packet to it - _ = w.Kill() - - return nil, err - default: - w.State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - // stop the worker, worker here might be in the broken state (network) - errS := w.Stop() - if errS != nil { - return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS)) - } - - return nil, err - } - } -} - func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 8e2667ac..a45aa29d 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -42,6 +42,34 @@ func Test_NewPool(t *testing.T) { assert.NotNil(t, p) } +func Test_NewPoolReset(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + w := p.Workers() + if len(w) == 0 { + t.Fatal("should be workers inside") + } + + pid := w[0].Pid() + require.NoError(t, p.Reset(context.Background())) + + w2 := p.Workers() + if len(w2) == 0 { + t.Fatal("should be workers inside") + } + + require.NotEqual(t, pid, w2[0].Pid()) + p.Destroy(ctx) +} + func Test_StaticPool_Invalid(t *testing.T) { p, err := Initialize( context.Background(), @@ -67,6 +95,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) + p.Destroy(context.Background()) } func Test_StaticPool_Echo(t *testing.T) { @@ -562,7 +591,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { func Test_CRC_WithPayload(t *testing.T) { ctx := context.Background() - _, err := Initialize( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") }, pipe.NewPipeFactory(), @@ -571,6 +600,7 @@ func Test_CRC_WithPayload(t *testing.T) { assert.Error(t, err) data := err.Error() assert.Contains(t, data, "warning: some weird php erro") + require.Nil(t, p) } /* PTR: diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 5f87c8a4..0502dc9a 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -51,6 +51,12 @@ func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*paylo panic("used to satisfy pool interface") } +func (sp *supervised) Reset(ctx context.Context) error { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Reset(ctx) +} + func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index 98af918a..6e8ab552 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -58,6 +58,33 @@ func TestSupervisedPool_Exec(t *testing.T) { p.Destroy(context.Background()) } +func Test_SupervisedPoolReset(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfgSupervised, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + w := p.Workers() + if len(w) == 0 { + t.Fatal("should be workers inside") + } + + pid := w[0].Pid() + require.NoError(t, p.Reset(context.Background())) + + w2 := p.Workers() + if len(w2) == 0 { + t.Fatal("should be workers inside") + } + + require.NotEqual(t, pid, w2[0].Pid()) +} + // This test should finish without freezes func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { var cfgSupervised = cfgSupervised |