From b4e4f7e7e60bff48a63df4a3c606398ea2a32d8a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 15 Dec 2021 00:12:23 +0300 Subject: Update static_pool and worker to wait response from the worker Signed-off-by: Valery Piashchynski --- pool/static_pool.go | 12 +++++++++++- pool/supervisor_test.go | 7 +++++-- 2 files changed, 16 insertions(+), 3 deletions(-) (limited to 'pool') diff --git a/pool/static_pool.go b/pool/static_pool.go index 9897b9e7..9636f19f 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -319,7 +319,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { // destroy the worker sw.State().Set(worker.StateDestroyed) - err = sw.Kill() + err = sw.Stop() if err != nil { sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid()))) return nil, err @@ -337,6 +337,16 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // redirect call to the worker with TTL r, err := sw.ExecWithTTL(ctx, p) + if err != nil { + return nil, err + } + + go func() { + // read the exit status to prevent process to be a zombie + _ = sw.Wait() + }() + + sw.State().Set(worker.StateDestroyed) if stopErr := sw.Stop(); stopErr != nil { sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index eb3c37dd..98af918a 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -211,18 +211,21 @@ func TestSupervisedPool_Idle(t *testing.T) { Body: []byte("foo"), }) - assert.Nil(t, err) + assert.NoError(t, err) assert.Empty(t, resp.Body) assert.Empty(t, resp.Context) time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.Exec(&payload.Payload{ + rsp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) assert.NoError(t, err) + require.NotNil(t, rsp) + time.Sleep(time.Second * 2) + require.Len(t, p.Workers(), 1) // should be new worker with new pid assert.NotEqual(t, pid, p.Workers()[0].Pid()) p.Destroy(context.Background()) -- cgit v1.2.3 From 33b32b1df5ad602b389925c0242c9be41b71953f Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 15 Dec 2021 00:38:41 +0300 Subject: small refactoring Signed-off-by: Valery Piashchynski --- pool/static_pool.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'pool') diff --git a/pool/static_pool.go b/pool/static_pool.go index 9636f19f..4906788f 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -318,7 +318,6 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { }() // destroy the worker - sw.State().Set(worker.StateDestroyed) err = sw.Stop() if err != nil { sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid()))) @@ -346,9 +345,10 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) _ = sw.Wait() }() - sw.State().Set(worker.StateDestroyed) - if stopErr := sw.Stop(); stopErr != nil { - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) + err = sw.Stop() + if err != nil { + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid()))) + return nil, err } return r, err -- cgit v1.2.3