summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-12-15 01:02:39 +0300
committerGitHub <[email protected]>2021-12-15 01:02:39 +0300
commitff1401e97a9d1f3c059a60acbcae3f4507dcfe03 (patch)
tree7076f4712f9b4dbfb6d3a1bae3f4ba812d92a89d
parentf2c79017ae5759256b03ec58b608f298a29e4b96 (diff)
parent1bcb131c1ace6bdb47123cf05e4943ac3c4744c4 (diff)
[#872]: bug(static_pool, debug mode): worker exited immediately after obtaining the response
-rw-r--r--CHANGELOG.md11
-rw-r--r--events/events.go5
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rwxr-xr-xinternal/protocol.go4
-rwxr-xr-xpool/static_pool.go18
-rw-r--r--pool/supervisor_test.go7
-rw-r--r--transport/pipe/pipe_factory_spawn_test.go36
-rwxr-xr-xtransport/pipe/pipe_factory_test.go36
-rw-r--r--transport/socket/socket_factory_spawn_test.go30
-rwxr-xr-xtransport/socket/socket_factory_test.go30
-rwxr-xr-xworker/sync_worker.go32
-rwxr-xr-xworker/worker.go34
13 files changed, 153 insertions, 96 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b2f28637..702099af 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,11 +1,20 @@
# CHANGELOG
+# v2.6.2 (15.12.2021)
+
+## 🩹 Fixes:
+
+- 🐛 Fix: worker exited immediately after obtaining the response. [BUG](https://github.com/spiral/roadrunner/issues/871) (reporter: @samdark).
+
+---
+
# v2.6.1 (14.12.2021)
## 🩹 Fixes:
- 🐛 Fix: memory leak when supervised static pool used. [PR](https://github.com/spiral/roadrunner/pull/870).
+---
# v2.6.0 (30.11.2021)
@@ -18,6 +27,8 @@
- 🐛 Fix: zombie processes in the `pool.debug` mode.
+---
+
# v2.5.1 (07.11.2021)
## 🩹 Fixes:
diff --git a/events/events.go b/events/events.go
index 0d6483e3..42519637 100644
--- a/events/events.go
+++ b/events/events.go
@@ -25,6 +25,8 @@ const (
EventWorkerStderr
// EventWorkerWaitExit is the worker exit event
EventWorkerWaitExit
+ // EventWorkerStopped triggered when worker gracefully stopped
+ EventWorkerStopped
)
func (et EventType) String() string {
@@ -51,7 +53,8 @@ func (et EventType) String() string {
return "EventWorkerStderr"
case EventWorkerWaitExit:
return "EventWorkerWaitExit"
-
+ case EventWorkerStopped:
+ return "EventWorkerStopped"
default:
return "UnknownEventType"
}
diff --git a/go.mod b/go.mod
index cffbddac..c5547a85 100644
--- a/go.mod
+++ b/go.mod
@@ -26,7 +26,7 @@ require (
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
- golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 // indirect
+ golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
diff --git a/go.sum b/go.sum
index 9cf093b9..cf51378e 100644
--- a/go.sum
+++ b/go.sum
@@ -50,8 +50,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cO
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 h1:TyHqChC80pFkXWraUUf6RuB5IqFdQieMLwwCJokV2pc=
-golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed h1:d5glpD+GMms2DMbu1doSYibjbKasYNvnhq885nOnRz8=
+golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
diff --git a/internal/protocol.go b/internal/protocol.go
index ba923ef5..c5916f22 100755
--- a/internal/protocol.go
+++ b/internal/protocol.go
@@ -4,14 +4,12 @@ import (
"os"
"sync"
- j "github.com/json-iterator/go"
+ json "github.com/json-iterator/go"
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/frame"
"github.com/spiral/goridge/v3/pkg/relay"
)
-var json = j.ConfigCompatibleWithStandardLibrary
-
type StopCommand struct {
Stop bool `json:"stop"`
}
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 9897b9e7..4906788f 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -318,8 +318,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
}()
// destroy the worker
- sw.State().Set(worker.StateDestroyed)
- err = sw.Kill()
+ err = sw.Stop()
if err != nil {
sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid())))
return nil, err
@@ -337,8 +336,19 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// redirect call to the worker with TTL
r, err := sw.ExecWithTTL(ctx, p)
- if stopErr := sw.Stop(); stopErr != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid())))
+ if err != nil {
+ return nil, err
+ }
+
+ go func() {
+ // read the exit status to prevent process to be a zombie
+ _ = sw.Wait()
+ }()
+
+ err = sw.Stop()
+ if err != nil {
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid())))
+ return nil, err
}
return r, err
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index eb3c37dd..98af918a 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -211,18 +211,21 @@ func TestSupervisedPool_Idle(t *testing.T) {
Body: []byte("foo"),
})
- assert.Nil(t, err)
+ assert.NoError(t, err)
assert.Empty(t, resp.Body)
assert.Empty(t, resp.Context)
time.Sleep(time.Second * 5)
// worker should be marked as invalid and reallocated
- _, err = p.Exec(&payload.Payload{
+ rsp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
assert.NoError(t, err)
+ require.NotNil(t, rsp)
+ time.Sleep(time.Second * 2)
+ require.Len(t, p.Workers(), 1)
// should be new worker with new pid
assert.NotEqual(t, pid, p.Workers()[0].Pid())
p.Destroy(context.Background())
diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go
index 96dd37a6..7e04f113 100644
--- a/transport/pipe/pipe_factory_spawn_test.go
+++ b/transport/pipe/pipe_factory_spawn_test.go
@@ -133,18 +133,9 @@ func Test_Pipe_Invalid2(t *testing.T) {
func Test_Pipe_Echo2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
+ assert.NoError(t, err)
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
@@ -152,27 +143,31 @@ func Test_Pipe_Echo2(t *testing.T) {
assert.NotNil(t, res.Body)
assert.Empty(t, res.Context)
+ go func() {
+ if w.Wait() != nil {
+ t.Fail()
+ }
+ }()
+
assert.Equal(t, "hello", res.String())
+ err = w.Stop()
+ assert.NoError(t, err)
}
func Test_Pipe_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
- }()
+ assert.NoError(t, err)
+ require.NotNil(t, w)
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
assert.Error(t, err)
assert.Nil(t, res)
+
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.Error(t, err)
}
func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
@@ -315,7 +310,6 @@ func Test_BadPayload2(t *testing.T) {
}()
res, err := sw.Exec(&payload.Payload{})
-
assert.Error(t, err)
assert.Nil(t, res)
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index 7ca49d09..c69be298 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -159,6 +159,7 @@ func Test_Pipe_Echo(t *testing.T) {
if err != nil {
t.Fatal(err)
}
+
defer func() {
err = w.Stop()
if err != nil {
@@ -167,14 +168,18 @@ func Test_Pipe_Echo(t *testing.T) {
}()
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
assert.Empty(t, res.Context)
+ go func() {
+ if w.Wait() != nil {
+ t.Fail()
+ }
+ }()
+
assert.Equal(t, "hello", res.String())
}
@@ -194,14 +199,18 @@ func Test_Pipe_Echo_Script(t *testing.T) {
}()
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
assert.Empty(t, res.Context)
+ go func() {
+ if w.Wait() != nil {
+ t.Fail()
+ }
+ }()
+
assert.Equal(t, "hello", res.String())
}
@@ -210,21 +219,22 @@ func Test_Pipe_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
+ require.NoError(t, err)
+ require.NotNil(t, w)
+
+ go func() {
+ errW := w.Wait()
+ require.Error(t, errW)
}()
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
assert.Error(t, err)
assert.Nil(t, res)
+
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.NoError(t, err)
}
func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go
index 2db2fd40..fd852080 100644
--- a/transport/socket/socket_factory_spawn_test.go
+++ b/transport/socket/socket_factory_spawn_test.go
@@ -53,7 +53,6 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
}
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
-
f := NewSocketServer(ls, time.Minute)
defer func() {
err = ls.Close()
@@ -66,6 +65,10 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, w)
+ go func() {
+ require.NoError(t, w.Wait())
+ }()
+
err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
@@ -180,15 +183,7 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Error(t, errW)
}()
- defer func() {
- time.Sleep(time.Second)
- err2 := w.Stop()
- // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
- assert.Error(t, err2)
- }()
-
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res)
@@ -198,6 +193,12 @@ func Test_Tcp_Broken2(t *testing.T) {
if !strings.Contains(ev.Message(), "undefined_function()") {
t.Fatal("should contain undefined_function() string")
}
+
+ time.Sleep(time.Second)
+ err2 := w.Stop()
+ // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
+ // but process exited
+ assert.NoError(t, err2)
}
func Test_Tcp_Echo2(t *testing.T) {
@@ -347,14 +348,7 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Error(t, errW)
}()
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
- }()
-
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
@@ -365,6 +359,10 @@ func Test_Unix_Broken2(t *testing.T) {
if !strings.Contains(ev.Message(), "undefined_function()") {
t.Fatal("should contain undefined_function string")
}
+
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.NoError(t, err)
}
func Test_Unix_Echo2(t *testing.T) {
diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go
index 7b28a847..10885bac 100755
--- a/transport/socket/socket_factory_test.go
+++ b/transport/socket/socket_factory_test.go
@@ -58,7 +58,6 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
}
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
-
f := NewSocketServer(ls, time.Minute)
defer func() {
err = ls.Close()
@@ -71,6 +70,10 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, w)
+ go func() {
+ require.NoError(t, w.Wait())
+ }()
+
err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
@@ -221,15 +224,7 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, errW)
}()
- defer func() {
- time.Sleep(time.Second)
- err2 := w.Stop()
- // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
- assert.Error(t, err2)
- }()
-
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res)
@@ -239,6 +234,12 @@ func Test_Tcp_Broken(t *testing.T) {
if !strings.Contains(ev.Message(), "undefined_function()") {
t.Fatal("should contain undefined_function string")
}
+
+ time.Sleep(time.Second)
+ err2 := w.Stop()
+ // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
+ // but process is stopped
+ assert.NoError(t, err2)
}
func Test_Tcp_Echo(t *testing.T) {
@@ -460,14 +461,7 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, errW)
}()
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
- }()
-
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
@@ -478,6 +472,10 @@ func Test_Unix_Broken(t *testing.T) {
t.Fatal("should contain undefined_function string")
}
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.NoError(t, err)
+
wg.Wait()
}
diff --git a/worker/sync_worker.go b/worker/sync_worker.go
index 81d8c5bf..e3e85ba6 100755
--- a/worker/sync_worker.go
+++ b/worker/sync_worker.go
@@ -20,6 +20,7 @@ type SyncWorkerImpl struct {
process *Process
fPool sync.Pool
bPool sync.Pool
+ chPool sync.Pool
}
// From creates SyncWorker from BaseProcess
@@ -32,12 +33,17 @@ func From(process *Process) *SyncWorkerImpl {
bPool: sync.Pool{New: func() interface{} {
return new(bytes.Buffer)
}},
+
+ chPool: sync.Pool{New: func() interface{} {
+ return make(chan wexec, 1)
+ }},
}
}
// Exec payload without TTL timeout.
func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
+
if len(p.Body) == 0 && len(p.Context) == 0 {
return nil, errors.E(op, errors.Str("payload can not be empty"))
}
@@ -81,7 +87,13 @@ type wexec struct {
// ExecWithTTL executes payload without TTL timeout.
func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
- c := make(chan wexec, 1)
+
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
+ }
+
+ c := tw.getCh()
+ defer tw.putCh(c)
// worker was killed before it started to work (supervisor)
if tw.process.State().Value() != StateReady {
@@ -91,10 +103,6 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
tw.process.State().Set(StateWorking)
- if len(p.Body) == 0 && len(p.Context) == 0 {
- return nil, errors.E(op, errors.Str("payload can not be empty"))
- }
-
go func() {
rsp, err := tw.execPayload(p)
if err != nil {
@@ -271,3 +279,17 @@ func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) {
f.Reset()
tw.fPool.Put(f)
}
+
+func (tw *SyncWorkerImpl) getCh() chan wexec {
+ return tw.chPool.Get().(chan wexec)
+}
+
+func (tw *SyncWorkerImpl) putCh(ch chan wexec) {
+ // just check if the chan is not empty
+ select {
+ case <-ch:
+ tw.chPool.Put(ch)
+ default:
+ tw.chPool.Put(ch)
+ }
+}
diff --git a/worker/worker.go b/worker/worker.go
index e5c3a192..564d83c4 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -44,7 +44,8 @@ type Process struct {
// pid of the process, points to pid of underlying process and
// can be nil while process is not started.
- pid int
+ pid int
+ doneCh chan struct{}
// communication bus with underlying process.
relay relay.Relay
@@ -63,6 +64,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
eventsID: id,
cmd: cmd,
state: NewWorkerState(StateInactive),
+ doneCh: make(chan struct{}, 1),
}
// set self as stderr implementation (Writer interface)
@@ -136,6 +138,7 @@ func (w *Process) Wait() error {
var err error
err = w.cmd.Wait()
defer w.events.Unsubscribe(w.eventsID)
+ w.doneCh <- struct{}{}
// If worker was destroyed, just exit
if w.State().Value() == StateDestroyed {
@@ -179,19 +182,26 @@ func (w *Process) closeRelay() error {
// Stop sends soft termination command to the Process and waits for process completion.
func (w *Process) Stop() error {
const op = errors.Op("process_stop")
- w.state.Set(StateStopping)
- err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true})
- if err != nil {
- w.state.Set(StateKilling)
- _ = w.cmd.Process.Signal(os.Kill)
+ defer w.events.Unsubscribe(w.eventsID)
- w.events.Unsubscribe(w.eventsID)
- return errors.E(op, errors.Network, err)
- }
+ select {
+ // finished
+ case <-w.doneCh:
+ return nil
+ default:
+ w.state.Set(StateStopping)
+ err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true})
+ if err != nil {
+ w.state.Set(StateKilling)
+ _ = w.cmd.Process.Signal(os.Kill)
- w.state.Set(StateStopped)
- w.events.Unsubscribe(w.eventsID)
- return nil
+ return errors.E(op, errors.Network, err)
+ }
+
+ <-w.doneCh
+ w.state.Set(StateStopped)
+ return nil
+ }
}
// Kill kills underlying process, make sure to call Wait() func to gather