From 4151bbffe7b3ab882de5f7ac29f41c974679f087 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 14 Jul 2021 16:46:32 +0300 Subject: Fix TTL issue, added explanation comments. The worker after it executed the request, may overwrite the TTL state. This inconsistency leads to the +1 worker in the FIFO channel. In this state, the Push operation was blocked. Add RR_BROADCAST_PATH. Signed-off-by: Valery Piashchynski --- pkg/pool/interface.go | 24 ++++++++++++++++ pkg/pool/static_pool.go | 2 +- pkg/pool/supervisor_pool.go | 39 ++++++++++++++++++++++---- pkg/pool/supervisor_test.go | 54 ++++++++++++++++++++++++++++++++++++ pkg/worker/sync_worker.go | 7 +++++ pkg/worker_watcher/container/vec.go | 2 +- pkg/worker_watcher/interface.go | 31 --------------------- pkg/worker_watcher/worker_watcher.go | 21 ++++++++++---- plugins/websockets/plugin.go | 9 ++++-- tests/sleep-ttl.php | 15 ++++++++++ 10 files changed, 157 insertions(+), 47 deletions(-) delete mode 100644 pkg/worker_watcher/interface.go create mode 100644 tests/sleep-ttl.php diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index c22fbbd3..bbf7653e 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -27,3 +27,27 @@ type Pool interface { // ExecWithContext executes task with context which is used with timeout execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) } + +// Watcher is an interface for the Sync workers lifecycle +type Watcher interface { + // Watch used to add workers to the container + Watch(workers []worker.BaseProcess) error + + // Get provide first free worker + Get(ctx context.Context) (worker.BaseProcess, error) + + // Push enqueues worker back + Push(w worker.BaseProcess) + + // Allocate - allocates new worker and put it into the WorkerWatcher + Allocate() error + + // Destroy destroys the underlying container + Destroy(ctx context.Context) + + // List return all container w/o removing it from internal storage + List() []worker.BaseProcess + + // Remove will remove worker from the container + Remove(wb worker.BaseProcess) +} diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index e568661f..5a6247b5 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -41,7 +41,7 @@ type StaticPool struct { listeners []events.Listener // manages worker states and TTLs - ww workerWatcher.Watcher + ww Watcher // allocate new worker allocator worker.Allocator diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index b09b6f6c..4b990dbe 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -121,7 +121,8 @@ func (sp *supervised) control() { //nolint:gocognit worker.StateDestroyed, worker.StateInactive, worker.StateStopped, - worker.StateStopping: + worker.StateStopping, + worker.StateKilling: continue } @@ -132,23 +133,40 @@ func (sp *supervised) control() { //nolint:gocognit } if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { - // SOFT termination. DO NOT STOP active workers + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + if workers[i].State().Value() != worker.StateWorking { workers[i].State().Set(worker.StateInvalid) _ = workers[i].Stop() } + // just to double check + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { - // SOFT termination. DO NOT STOP active workers + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + if workers[i].State().Value() != worker.StateWorking { workers[i].State().Set(worker.StateInvalid) _ = workers[i].Stop() } - - // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done + // just to double check workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue @@ -190,11 +208,20 @@ func (sp *supervised) control() { //nolint:gocognit // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + if workers[i].State().Value() != worker.StateWorking { workers[i].State().Set(worker.StateInvalid) _ = workers[i].Stop() } - + // just to double check workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 513d369f..1cd301ba 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -9,7 +9,9 @@ import ( "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var cfgSupervised = Config{ @@ -122,6 +124,58 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { assert.NotEqual(t, pid, p.Workers()[0].Pid()) } +func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 5 * time.Second, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + pid := p.Workers()[0].Pid() + + resp, err := p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Equal(t, string(resp.Body), "hello world") + assert.Empty(t, resp.Context) + + time.Sleep(time.Second) + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) + pid = p.Workers()[0].Pid() + + resp, err = p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Equal(t, string(resp.Body), "hello world") + assert.Empty(t, resp.Context) + + time.Sleep(time.Second) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) + + p.Destroy(context.Background()) +} + func TestSupervisedPool_Idle(t *testing.T) { var cfgExecTTL = Config{ NumWorkers: uint64(1), diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 84ff5977..02f11d0b 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -60,6 +60,13 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, err) } + // supervisor may set state of the worker during the work + // in this case we should not re-write the worker state + if tw.process.State().Value() != StateWorking { + tw.process.State().RegisterExec() + return rsp, nil + } + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go index b9150c43..24b5fa6d 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/vec.go @@ -13,7 +13,7 @@ type Vec struct { workers chan worker.BaseProcess } -func NewVector(initialNumOfWorkers uint64) Vector { +func NewVector(initialNumOfWorkers uint64) *Vec { vec := &Vec{ destroy: 0, workers: make(chan worker.BaseProcess, initialNumOfWorkers), diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go deleted file mode 100644 index 29fa3640..00000000 --- a/pkg/worker_watcher/interface.go +++ /dev/null @@ -1,31 +0,0 @@ -package worker_watcher //nolint:stylecheck - -import ( - "context" - - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -// Watcher is an interface for the Sync workers lifecycle -type Watcher interface { - // Watch used to add workers to the container - Watch(workers []worker.BaseProcess) error - - // Get provide first free worker - Get(ctx context.Context) (worker.BaseProcess, error) - - // Push enqueues worker back - Push(w worker.BaseProcess) - - // Allocate - allocates new worker and put it into the WorkerWatcher - Allocate() error - - // Destroy destroys the underlying container - Destroy(ctx context.Context) - - // List return all container w/o removing it from internal storage - List() []worker.BaseProcess - - // Remove will remove worker from the container - Remove(wb worker.BaseProcess) -} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index f82de958..b2d61d48 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -11,8 +11,18 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) +// Vector interface represents vector container +type Vector interface { + // Enqueue used to put worker to the vector + Enqueue(worker.BaseProcess) + // Dequeue used to get worker from the vector + Dequeue(ctx context.Context) (worker.BaseProcess, error) + // Destroy used to stop releasing the workers + Destroy() +} + // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { ww := &workerWatcher{ container: container.NewVector(numWorkers), numWorkers: numWorkers, @@ -26,7 +36,7 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events type workerWatcher struct { sync.RWMutex - container container.Vector + container Vector // used to control the Destroy stage (that all workers are in the container) numWorkers uint64 workers []worker.BaseProcess @@ -150,11 +160,12 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { // Push O(1) operation func (ww *workerWatcher) Push(w worker.BaseProcess) { - if w.State().Value() != worker.StateReady { + switch w.State().Value() { + case worker.StateReady: + ww.container.Enqueue(w) + default: _ = w.Kill() - return } - ww.container.Enqueue(w) } // Destroy all underlying container (but let them to complete the task) diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index ca5f2f59..1115bd10 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -28,6 +28,9 @@ import ( const ( PluginName string = "websockets" + + RrMode string = "RR_MODE" + RrBroadcastPath string = "RR_BROADCAST_PATH" ) type Plugin struct { @@ -113,7 +116,7 @@ func (p *Plugin) Serve() chan error { AllocateTimeout: p.cfg.Pool.AllocateTimeout, DestroyTimeout: p.cfg.Pool.DestroyTimeout, Supervisor: p.cfg.Pool.Supervisor, - }, map[string]string{"RR_MODE": "http"}) + }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path}) if err != nil { errCh <- err } @@ -176,7 +179,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { val, err := p.accessValidator(r) p.RUnlock() if err != nil { - p.log.Error("validation error") + p.log.Error("access validation") w.WriteHeader(400) return } @@ -280,7 +283,7 @@ func (p *Plugin) Reset() error { AllocateTimeout: p.cfg.Pool.AllocateTimeout, DestroyTimeout: p.cfg.Pool.DestroyTimeout, Supervisor: p.cfg.Pool.Supervisor, - }, map[string]string{"RR_MODE": "http"}) + }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path}) if err != nil { return errors.E(op, err) } diff --git a/tests/sleep-ttl.php b/tests/sleep-ttl.php new file mode 100644 index 00000000..2230e615 --- /dev/null +++ b/tests/sleep-ttl.php @@ -0,0 +1,15 @@ +waitPayload()){ + sleep(10); + $rr->respond(new \Spiral\RoadRunner\Payload("hello world")); +} -- cgit v1.2.3 From e2bdbd183e4f15c7104c24ca38ec2853faaf4b93 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 14 Jul 2021 17:01:21 +0300 Subject: Update CHANGELOG Signed-off-by: Valery Piashchynski --- CHANGELOG.md | 54 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00a52eac..a11302d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,18 +1,37 @@ CHANGELOG ========= +v2.3.2 (14.07.2021) +------------------- + +## 🩹 Fixes: + +- 🐛 Fix: Bug with ttl incorrectly handled by the worker [PR](https://github.com/spiral/roadrunner/pull/749) +- 🐛 Fix: Add `RR_BROADCAST_PATH` to the `websockets` plugin [PR](https://github.com/spiral/roadrunner/pull/749) + +## 📈 Summary: + +- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/30?closed=1) + +--- + v2.3.1 (30.06.2021) ------------------- + ## 👀 New: -- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc` folder. [PR](https://github.com/spiral/roadrunner/pull/732) +- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc` + folder. [PR](https://github.com/spiral/roadrunner/pull/732) - ✏️ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736) ## 🩹 Fixes: -- 🐛 Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit reached [PR](https://github.com/spiral/roadrunner/pull/738) -- 🐛 Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next request [PR](https://github.com/spiral/roadrunner/pull/738) -- 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717), [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719) +- 🐛 Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit + reached [PR](https://github.com/spiral/roadrunner/pull/738) +- 🐛 Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next + request [PR](https://github.com/spiral/roadrunner/pull/738) +- 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717) + , [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719) - 🐛 Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720) - 🐛 Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128) - 🐛 Fix: Bug, incorrect request `origin` check [Bug](https://github.com/spiral/roadrunner/issues/727) @@ -38,20 +57,29 @@ v2.3.0 (08.06.2021) - ✏️ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513) -- ✏️ Protobuf binary messages for the `websockets` and `kv` RPC calls under the hood. [Issue](https://github.com/spiral/roadrunner/issues/711) -- ✏️ Json-schemas for the config file v1.0 (it also registered in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614)) +- ✏️ Protobuf binary messages for the `websockets` and `kv` RPC calls under the + hood. [Issue](https://github.com/spiral/roadrunner/issues/711) +- ✏️ Json-schemas for the config file v1.0 (it also registered + in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614)) - ✏️ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead) -- ✏️ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error code. [Issue](https://github.com/spiral/roadrunner/issues/659) -- ✏️ Expose HTTP plugin metrics (workers memory, requests count, requests duration). [Issue](https://github.com/spiral/roadrunner/issues/489) -- ✏️ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh` scripts. [Issue](https://github.com/spiral/roadrunner/issues/658) -- ✏️ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation), [Issue](https://github.com/spiral/roadrunner/issues/545) +- ✏️ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error + code. [Issue](https://github.com/spiral/roadrunner/issues/659) +- ✏️ Expose HTTP plugin metrics (workers memory, requests count, requests duration) + . [Issue](https://github.com/spiral/roadrunner/issues/489) +- ✏️ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh` + scripts. [Issue](https://github.com/spiral/roadrunner/issues/658) +- ✏️ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation) + , [Issue](https://github.com/spiral/roadrunner/issues/545) ## 🩹 Fixes: - 🐛 Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686) -- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in logs: [Bug](https://github.com/spiral/roadrunner/issues/659) -- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob` error: [Bug](https://github.com/spiral/roadrunner/issues/691) -- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the NPE: [Bug](https://github.com/spiral/roadrunner/issues/701) +- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in + logs: [Bug](https://github.com/spiral/roadrunner/issues/659) +- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob` + error: [Bug](https://github.com/spiral/roadrunner/issues/691) +- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the + NPE: [Bug](https://github.com/spiral/roadrunner/issues/701) ## 📦 Packages: -- cgit v1.2.3 From cb28ad07fadb78e2e77e485cd9b96abeddbf3a5c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 14 Jul 2021 17:09:57 +0300 Subject: Fix milestone in the CHANGELOG Signed-off-by: Valery Piashchynski --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a11302d5..336fd58e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ v2.3.2 (14.07.2021) ## 📈 Summary: -- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/30?closed=1) +- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/31?closed=1) --- -- cgit v1.2.3