diff options
author | Valery Piashchynski <[email protected]> | 2021-12-15 01:02:39 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-12-15 01:02:39 +0300 |
commit | ff1401e97a9d1f3c059a60acbcae3f4507dcfe03 (patch) | |
tree | 7076f4712f9b4dbfb6d3a1bae3f4ba812d92a89d | |
parent | f2c79017ae5759256b03ec58b608f298a29e4b96 (diff) | |
parent | 1bcb131c1ace6bdb47123cf05e4943ac3c4744c4 (diff) |
[#872]: bug(static_pool, debug mode): worker exited immediately after obtaining the response
-rw-r--r-- | CHANGELOG.md | 11 | ||||
-rw-r--r-- | events/events.go | 5 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 4 | ||||
-rwxr-xr-x | internal/protocol.go | 4 | ||||
-rwxr-xr-x | pool/static_pool.go | 18 | ||||
-rw-r--r-- | pool/supervisor_test.go | 7 | ||||
-rw-r--r-- | transport/pipe/pipe_factory_spawn_test.go | 36 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory_test.go | 36 | ||||
-rw-r--r-- | transport/socket/socket_factory_spawn_test.go | 30 | ||||
-rwxr-xr-x | transport/socket/socket_factory_test.go | 30 | ||||
-rwxr-xr-x | worker/sync_worker.go | 32 | ||||
-rwxr-xr-x | worker/worker.go | 34 |
13 files changed, 153 insertions, 96 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index b2f28637..702099af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,20 @@ # CHANGELOG +# v2.6.2 (15.12.2021) + +## 🩹 Fixes: + +- 🐛 Fix: worker exited immediately after obtaining the response. [BUG](https://github.com/spiral/roadrunner/issues/871) (reporter: @samdark). + +--- + # v2.6.1 (14.12.2021) ## 🩹 Fixes: - 🐛 Fix: memory leak when supervised static pool used. [PR](https://github.com/spiral/roadrunner/pull/870). +--- # v2.6.0 (30.11.2021) @@ -18,6 +27,8 @@ - 🐛 Fix: zombie processes in the `pool.debug` mode. +--- + # v2.5.1 (07.11.2021) ## 🩹 Fixes: diff --git a/events/events.go b/events/events.go index 0d6483e3..42519637 100644 --- a/events/events.go +++ b/events/events.go @@ -25,6 +25,8 @@ const ( EventWorkerStderr // EventWorkerWaitExit is the worker exit event EventWorkerWaitExit + // EventWorkerStopped triggered when worker gracefully stopped + EventWorkerStopped ) func (et EventType) String() string { @@ -51,7 +53,8 @@ func (et EventType) String() string { return "EventWorkerStderr" case EventWorkerWaitExit: return "EventWorkerWaitExit" - + case EventWorkerStopped: + return "EventWorkerStopped" default: return "UnknownEventType" } @@ -26,7 +26,7 @@ require ( github.com/tklauser/numcpus v0.3.0 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.uber.org/atomic v1.9.0 // indirect - golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 // indirect + golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) @@ -50,8 +50,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cO golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 h1:TyHqChC80pFkXWraUUf6RuB5IqFdQieMLwwCJokV2pc= -golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed h1:d5glpD+GMms2DMbu1doSYibjbKasYNvnhq885nOnRz8= +golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/internal/protocol.go b/internal/protocol.go index ba923ef5..c5916f22 100755 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -4,14 +4,12 @@ import ( "os" "sync" - j "github.com/json-iterator/go" + json "github.com/json-iterator/go" "github.com/spiral/errors" "github.com/spiral/goridge/v3/pkg/frame" "github.com/spiral/goridge/v3/pkg/relay" ) -var json = j.ConfigCompatibleWithStandardLibrary - type StopCommand struct { Stop bool `json:"stop"` } diff --git a/pool/static_pool.go b/pool/static_pool.go index 9897b9e7..4906788f 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -318,8 +318,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { }() // destroy the worker - sw.State().Set(worker.StateDestroyed) - err = sw.Kill() + err = sw.Stop() if err != nil { sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid()))) return nil, err @@ -337,8 +336,19 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // redirect call to the worker with TTL r, err := sw.ExecWithTTL(ctx, p) - if stopErr := sw.Stop(); stopErr != nil { - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) + if err != nil { + return nil, err + } + + go func() { + // read the exit status to prevent process to be a zombie + _ = sw.Wait() + }() + + err = sw.Stop() + if err != nil { + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid()))) + return nil, err } return r, err diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index eb3c37dd..98af918a 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -211,18 +211,21 @@ func TestSupervisedPool_Idle(t *testing.T) { Body: []byte("foo"), }) - assert.Nil(t, err) + assert.NoError(t, err) assert.Empty(t, resp.Body) assert.Empty(t, resp.Context) time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.Exec(&payload.Payload{ + rsp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) assert.NoError(t, err) + require.NotNil(t, rsp) + time.Sleep(time.Second * 2) + require.Len(t, p.Workers(), 1) // should be new worker with new pid assert.NotEqual(t, pid, p.Workers()[0].Pid()) p.Destroy(context.Background()) diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go index 96dd37a6..7e04f113 100644 --- a/transport/pipe/pipe_factory_spawn_test.go +++ b/transport/pipe/pipe_factory_spawn_test.go @@ -133,18 +133,9 @@ func Test_Pipe_Invalid2(t *testing.T) { func Test_Pipe_Echo2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() + assert.NoError(t, err) sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) @@ -152,27 +143,31 @@ func Test_Pipe_Echo2(t *testing.T) { assert.NotNil(t, res.Body) assert.Empty(t, res.Context) + go func() { + if w.Wait() != nil { + t.Fail() + } + }() + assert.Equal(t, "hello", res.String()) + err = w.Stop() + assert.NoError(t, err) } func Test_Pipe_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() + assert.NoError(t, err) + require.NotNil(t, w) sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) assert.Nil(t, res) + + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) } func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { @@ -315,7 +310,6 @@ func Test_BadPayload2(t *testing.T) { }() res, err := sw.Exec(&payload.Payload{}) - assert.Error(t, err) assert.Nil(t, res) diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index 7ca49d09..c69be298 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -159,6 +159,7 @@ func Test_Pipe_Echo(t *testing.T) { if err != nil { t.Fatal(err) } + defer func() { err = w.Stop() if err != nil { @@ -167,14 +168,18 @@ func Test_Pipe_Echo(t *testing.T) { }() sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) assert.Empty(t, res.Context) + go func() { + if w.Wait() != nil { + t.Fail() + } + }() + assert.Equal(t, "hello", res.String()) } @@ -194,14 +199,18 @@ func Test_Pipe_Echo_Script(t *testing.T) { }() sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) assert.Empty(t, res.Context) + go func() { + if w.Wait() != nil { + t.Fail() + } + }() + assert.Equal(t, "hello", res.String()) } @@ -210,21 +219,22 @@ func Test_Pipe_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) + require.NoError(t, err) + require.NotNil(t, w) + + go func() { + errW := w.Wait() + require.Error(t, errW) }() sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) assert.Nil(t, res) + + time.Sleep(time.Second) + err = w.Stop() + assert.NoError(t, err) } func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go index 2db2fd40..fd852080 100644 --- a/transport/socket/socket_factory_spawn_test.go +++ b/transport/socket/socket_factory_spawn_test.go @@ -53,7 +53,6 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) { } cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - f := NewSocketServer(ls, time.Minute) defer func() { err = ls.Close() @@ -66,6 +65,10 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, w) + go func() { + require.NoError(t, w.Wait()) + }() + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) @@ -180,15 +183,7 @@ func Test_Tcp_Broken2(t *testing.T) { assert.Error(t, errW) }() - defer func() { - time.Sleep(time.Second) - err2 := w.Stop() - // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection - assert.Error(t, err2) - }() - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res) @@ -198,6 +193,12 @@ func Test_Tcp_Broken2(t *testing.T) { if !strings.Contains(ev.Message(), "undefined_function()") { t.Fatal("should contain undefined_function() string") } + + time.Sleep(time.Second) + err2 := w.Stop() + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + // but process exited + assert.NoError(t, err2) } func Test_Tcp_Echo2(t *testing.T) { @@ -347,14 +348,7 @@ func Test_Unix_Broken2(t *testing.T) { assert.Error(t, errW) }() - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) @@ -365,6 +359,10 @@ func Test_Unix_Broken2(t *testing.T) { if !strings.Contains(ev.Message(), "undefined_function()") { t.Fatal("should contain undefined_function string") } + + time.Sleep(time.Second) + err = w.Stop() + assert.NoError(t, err) } func Test_Unix_Echo2(t *testing.T) { diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go index 7b28a847..10885bac 100755 --- a/transport/socket/socket_factory_test.go +++ b/transport/socket/socket_factory_test.go @@ -58,7 +58,6 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { } cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - f := NewSocketServer(ls, time.Minute) defer func() { err = ls.Close() @@ -71,6 +70,10 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, w) + go func() { + require.NoError(t, w.Wait()) + }() + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) @@ -221,15 +224,7 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, errW) }() - defer func() { - time.Sleep(time.Second) - err2 := w.Stop() - // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection - assert.Error(t, err2) - }() - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res) @@ -239,6 +234,12 @@ func Test_Tcp_Broken(t *testing.T) { if !strings.Contains(ev.Message(), "undefined_function()") { t.Fatal("should contain undefined_function string") } + + time.Sleep(time.Second) + err2 := w.Stop() + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + // but process is stopped + assert.NoError(t, err2) } func Test_Tcp_Echo(t *testing.T) { @@ -460,14 +461,7 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, errW) }() - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) @@ -478,6 +472,10 @@ func Test_Unix_Broken(t *testing.T) { t.Fatal("should contain undefined_function string") } + time.Sleep(time.Second) + err = w.Stop() + assert.NoError(t, err) + wg.Wait() } diff --git a/worker/sync_worker.go b/worker/sync_worker.go index 81d8c5bf..e3e85ba6 100755 --- a/worker/sync_worker.go +++ b/worker/sync_worker.go @@ -20,6 +20,7 @@ type SyncWorkerImpl struct { process *Process fPool sync.Pool bPool sync.Pool + chPool sync.Pool } // From creates SyncWorker from BaseProcess @@ -32,12 +33,17 @@ func From(process *Process) *SyncWorkerImpl { bPool: sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}, + + chPool: sync.Pool{New: func() interface{} { + return make(chan wexec, 1) + }}, } } // Exec payload without TTL timeout. func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec") + if len(p.Body) == 0 && len(p.Context) == 0 { return nil, errors.E(op, errors.Str("payload can not be empty")) } @@ -81,7 +87,13 @@ type wexec struct { // ExecWithTTL executes payload without TTL timeout. func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") - c := make(chan wexec, 1) + + if len(p.Body) == 0 && len(p.Context) == 0 { + return nil, errors.E(op, errors.Str("payload can not be empty")) + } + + c := tw.getCh() + defer tw.putCh(c) // worker was killed before it started to work (supervisor) if tw.process.State().Value() != StateReady { @@ -91,10 +103,6 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) ( tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) tw.process.State().Set(StateWorking) - if len(p.Body) == 0 && len(p.Context) == 0 { - return nil, errors.E(op, errors.Str("payload can not be empty")) - } - go func() { rsp, err := tw.execPayload(p) if err != nil { @@ -271,3 +279,17 @@ func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) { f.Reset() tw.fPool.Put(f) } + +func (tw *SyncWorkerImpl) getCh() chan wexec { + return tw.chPool.Get().(chan wexec) +} + +func (tw *SyncWorkerImpl) putCh(ch chan wexec) { + // just check if the chan is not empty + select { + case <-ch: + tw.chPool.Put(ch) + default: + tw.chPool.Put(ch) + } +} diff --git a/worker/worker.go b/worker/worker.go index e5c3a192..564d83c4 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -44,7 +44,8 @@ type Process struct { // pid of the process, points to pid of underlying process and // can be nil while process is not started. - pid int + pid int + doneCh chan struct{} // communication bus with underlying process. relay relay.Relay @@ -63,6 +64,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { eventsID: id, cmd: cmd, state: NewWorkerState(StateInactive), + doneCh: make(chan struct{}, 1), } // set self as stderr implementation (Writer interface) @@ -136,6 +138,7 @@ func (w *Process) Wait() error { var err error err = w.cmd.Wait() defer w.events.Unsubscribe(w.eventsID) + w.doneCh <- struct{}{} // If worker was destroyed, just exit if w.State().Value() == StateDestroyed { @@ -179,19 +182,26 @@ func (w *Process) closeRelay() error { // Stop sends soft termination command to the Process and waits for process completion. func (w *Process) Stop() error { const op = errors.Op("process_stop") - w.state.Set(StateStopping) - err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) - if err != nil { - w.state.Set(StateKilling) - _ = w.cmd.Process.Signal(os.Kill) + defer w.events.Unsubscribe(w.eventsID) - w.events.Unsubscribe(w.eventsID) - return errors.E(op, errors.Network, err) - } + select { + // finished + case <-w.doneCh: + return nil + default: + w.state.Set(StateStopping) + err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) + if err != nil { + w.state.Set(StateKilling) + _ = w.cmd.Process.Signal(os.Kill) - w.state.Set(StateStopped) - w.events.Unsubscribe(w.eventsID) - return nil + return errors.E(op, errors.Network, err) + } + + <-w.doneCh + w.state.Set(StateStopped) + return nil + } } // Kill kills underlying process, make sure to call Wait() func to gather |