summaryrefslogtreecommitdiff
path: root/pool
diff options
context:
space:
mode:
Diffstat (limited to 'pool')
-rw-r--r--pool/interface.go6
-rwxr-xr-xpool/static_pool.go130
-rwxr-xr-xpool/static_pool_test.go32
-rwxr-xr-xpool/supervisor_pool.go6
-rw-r--r--pool/supervisor_test.go27
5 files changed, 143 insertions, 58 deletions
diff --git a/pool/interface.go b/pool/interface.go
index d089092f..6a150188 100644
--- a/pool/interface.go
+++ b/pool/interface.go
@@ -21,6 +21,9 @@ type Pool interface {
// RemoveWorker removes worker from the pool.
RemoveWorker(worker worker.BaseProcess) error
+ // Reset kill all workers inside the watcher and replaces with new
+ Reset(ctx context.Context) error
+
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
@@ -45,6 +48,9 @@ type Watcher interface {
// Destroy destroys the underlying container
Destroy(ctx context.Context)
+ // Reset will replace container and workers array, kill all workers
+ Reset(ctx context.Context)
+
// List return all container w/o removing it from internal storage
List() []worker.BaseProcess
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 4906788f..7481f84f 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -156,6 +156,79 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
return rsp, nil
}
+// Destroy all underlying stack (but let them complete the task).
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.events.Unsubscribe(sp.eventsID)
+ sp.ww.Destroy(ctx)
+}
+
+func (sp *StaticPool) Reset(ctx context.Context) error {
+ // destroy all workers
+ sp.ww.Reset(ctx)
+ workers, err := sp.allocateWorkers(sp.cfg.NumWorkers)
+ if err != nil {
+ return err
+ }
+ // add the NEW workers to the watcher
+ err = sp.ww.Watch(workers)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
+ return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
+ // just push event if on any stage was timeout error
+ switch {
+ case errors.Is(errors.ExecTTL, err):
+ sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err)))
+ w.State().Set(worker.StateInvalid)
+ return nil, err
+
+ case errors.Is(errors.SoftJob, err):
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+
+ // if max jobs exceed
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // mark old as invalid and stop
+ w.State().Set(worker.StateInvalid)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, err
+ }
+
+ // soft jobs errors are allowed, just put the worker back
+ sp.ww.Release(w)
+
+ return nil, err
+ case errors.Is(errors.Network, err):
+ // in case of network error, we can't stop the worker, we should kill it
+ w.State().Set(worker.StateInvalid)
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+
+ // kill the worker instead of sending net packet to it
+ _ = w.Kill()
+
+ return nil, err
+ default:
+ w.State().Set(worker.StateInvalid)
+ sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+ // stop the worker, worker here might be in the broken state (network)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, err
+ }
+ }
+}
+
// Be careful, sync with pool.Exec method
func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
@@ -225,63 +298,6 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
return w, nil
}
-// Destroy all underlying stack (but let them complete the task).
-func (sp *StaticPool) Destroy(ctx context.Context) {
- sp.events.Unsubscribe(sp.eventsID)
- sp.ww.Destroy(ctx)
-}
-
-func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
- // just push event if on any stage was timeout error
- switch {
- case errors.Is(errors.ExecTTL, err):
- sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err)))
- w.State().Set(worker.StateInvalid)
- return nil, err
-
- case errors.Is(errors.SoftJob, err):
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
- // if max jobs exceed
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- // mark old as invalid and stop
- w.State().Set(worker.StateInvalid)
- errS := w.Stop()
- if errS != nil {
- return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
- }
-
- return nil, err
- }
-
- // soft jobs errors are allowed, just put the worker back
- sp.ww.Release(w)
-
- return nil, err
- case errors.Is(errors.Network, err):
- // in case of network error, we can't stop the worker, we should kill it
- w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
- // kill the worker instead of sending net packet to it
- _ = w.Kill()
-
- return nil, err
- default:
- w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
- // stop the worker, worker here might be in the broken state (network)
- errS := w.Stop()
- if errS != nil {
- return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS))
- }
-
- return nil, err
- }
- }
-}
-
func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.SyncWorker, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index 8e2667ac..a45aa29d 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -42,6 +42,34 @@ func Test_NewPool(t *testing.T) {
assert.NotNil(t, p)
}
+func Test_NewPoolReset(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ w := p.Workers()
+ if len(w) == 0 {
+ t.Fatal("should be workers inside")
+ }
+
+ pid := w[0].Pid()
+ require.NoError(t, p.Reset(context.Background()))
+
+ w2 := p.Workers()
+ if len(w2) == 0 {
+ t.Fatal("should be workers inside")
+ }
+
+ require.NotEqual(t, pid, w2[0].Pid())
+ p.Destroy(ctx)
+}
+
func Test_StaticPool_Invalid(t *testing.T) {
p, err := Initialize(
context.Background(),
@@ -67,6 +95,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
assert.NotNil(t, p)
assert.NoError(t, err)
+ p.Destroy(context.Background())
}
func Test_StaticPool_Echo(t *testing.T) {
@@ -562,7 +591,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
func Test_CRC_WithPayload(t *testing.T) {
ctx := context.Background()
- _, err := Initialize(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") },
pipe.NewPipeFactory(),
@@ -571,6 +600,7 @@ func Test_CRC_WithPayload(t *testing.T) {
assert.Error(t, err)
data := err.Error()
assert.Contains(t, data, "warning: some weird php erro")
+ require.Nil(t, p)
}
/* PTR:
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 5f87c8a4..0502dc9a 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -51,6 +51,12 @@ func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*paylo
panic("used to satisfy pool interface")
}
+func (sp *supervised) Reset(ctx context.Context) error {
+ sp.mu.Lock()
+ defer sp.mu.Unlock()
+ return sp.pool.Reset(ctx)
+}
+
func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index 98af918a..6e8ab552 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -58,6 +58,33 @@ func TestSupervisedPool_Exec(t *testing.T) {
p.Destroy(context.Background())
}
+func Test_SupervisedPoolReset(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgSupervised,
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ w := p.Workers()
+ if len(w) == 0 {
+ t.Fatal("should be workers inside")
+ }
+
+ pid := w[0].Pid()
+ require.NoError(t, p.Reset(context.Background()))
+
+ w2 := p.Workers()
+ if len(w2) == 0 {
+ t.Fatal("should be workers inside")
+ }
+
+ require.NotEqual(t, pid, w2[0].Pid())
+}
+
// This test should finish without freezes
func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
var cfgSupervised = cfgSupervised