summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md54
-rw-r--r--pkg/pool/interface.go24
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/supervisor_pool.go39
-rw-r--r--pkg/pool/supervisor_test.go54
-rwxr-xr-xpkg/worker/sync_worker.go7
-rw-r--r--pkg/worker_watcher/container/vec.go2
-rw-r--r--pkg/worker_watcher/interface.go31
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go21
-rw-r--r--plugins/websockets/plugin.go9
-rw-r--r--tests/sleep-ttl.php15
11 files changed, 198 insertions, 60 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 00a52eac..336fd58e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,18 +1,37 @@
CHANGELOG
=========
+v2.3.2 (14.07.2021)
+-------------------
+
+## 🩹 Fixes:
+
+- 🐛 Fix: Bug with ttl incorrectly handled by the worker [PR](https://github.com/spiral/roadrunner/pull/749)
+- 🐛 Fix: Add `RR_BROADCAST_PATH` to the `websockets` plugin [PR](https://github.com/spiral/roadrunner/pull/749)
+
+## 📈 Summary:
+
+- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/31?closed=1)
+
+---
+
v2.3.1 (30.06.2021)
-------------------
+
## 👀 New:
-- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc` folder. [PR](https://github.com/spiral/roadrunner/pull/732)
+- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc`
+ folder. [PR](https://github.com/spiral/roadrunner/pull/732)
- ✏️ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736)
## 🩹 Fixes:
-- 🐛 Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit reached [PR](https://github.com/spiral/roadrunner/pull/738)
-- 🐛 Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next request [PR](https://github.com/spiral/roadrunner/pull/738)
-- 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717), [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719)
+- 🐛 Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit
+ reached [PR](https://github.com/spiral/roadrunner/pull/738)
+- 🐛 Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next
+ request [PR](https://github.com/spiral/roadrunner/pull/738)
+- 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717)
+ , [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719)
- 🐛 Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720)
- 🐛 Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128)
- 🐛 Fix: Bug, incorrect request `origin` check [Bug](https://github.com/spiral/roadrunner/issues/727)
@@ -38,20 +57,29 @@ v2.3.0 (08.06.2021)
- ✏️ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of
thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus
on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513)
-- ✏️ Protobuf binary messages for the `websockets` and `kv` RPC calls under the hood. [Issue](https://github.com/spiral/roadrunner/issues/711)
-- ✏️ Json-schemas for the config file v1.0 (it also registered in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614))
+- ✏️ Protobuf binary messages for the `websockets` and `kv` RPC calls under the
+ hood. [Issue](https://github.com/spiral/roadrunner/issues/711)
+- ✏️ Json-schemas for the config file v1.0 (it also registered
+ in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614))
- ✏️ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead)
-- ✏️ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error code. [Issue](https://github.com/spiral/roadrunner/issues/659)
-- ✏️ Expose HTTP plugin metrics (workers memory, requests count, requests duration). [Issue](https://github.com/spiral/roadrunner/issues/489)
-- ✏️ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh` scripts. [Issue](https://github.com/spiral/roadrunner/issues/658)
-- ✏️ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation), [Issue](https://github.com/spiral/roadrunner/issues/545)
+- ✏️ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error
+ code. [Issue](https://github.com/spiral/roadrunner/issues/659)
+- ✏️ Expose HTTP plugin metrics (workers memory, requests count, requests duration)
+ . [Issue](https://github.com/spiral/roadrunner/issues/489)
+- ✏️ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh`
+ scripts. [Issue](https://github.com/spiral/roadrunner/issues/658)
+- ✏️ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation)
+ , [Issue](https://github.com/spiral/roadrunner/issues/545)
## 🩹 Fixes:
- 🐛 Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686)
-- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in logs: [Bug](https://github.com/spiral/roadrunner/issues/659)
-- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob` error: [Bug](https://github.com/spiral/roadrunner/issues/691)
-- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the NPE: [Bug](https://github.com/spiral/roadrunner/issues/701)
+- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in
+ logs: [Bug](https://github.com/spiral/roadrunner/issues/659)
+- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob`
+ error: [Bug](https://github.com/spiral/roadrunner/issues/691)
+- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the
+ NPE: [Bug](https://github.com/spiral/roadrunner/issues/701)
## 📦 Packages:
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index c22fbbd3..bbf7653e 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -27,3 +27,27 @@ type Pool interface {
// ExecWithContext executes task with context which is used with timeout
execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
}
+
+// Watcher is an interface for the Sync workers lifecycle
+type Watcher interface {
+ // Watch used to add workers to the container
+ Watch(workers []worker.BaseProcess) error
+
+ // Get provide first free worker
+ Get(ctx context.Context) (worker.BaseProcess, error)
+
+ // Push enqueues worker back
+ Push(w worker.BaseProcess)
+
+ // Allocate - allocates new worker and put it into the WorkerWatcher
+ Allocate() error
+
+ // Destroy destroys the underlying container
+ Destroy(ctx context.Context)
+
+ // List return all container w/o removing it from internal storage
+ List() []worker.BaseProcess
+
+ // Remove will remove worker from the container
+ Remove(wb worker.BaseProcess)
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index e568661f..5a6247b5 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -41,7 +41,7 @@ type StaticPool struct {
listeners []events.Listener
// manages worker states and TTLs
- ww workerWatcher.Watcher
+ ww Watcher
// allocate new worker
allocator worker.Allocator
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index b09b6f6c..4b990dbe 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -121,7 +121,8 @@ func (sp *supervised) control() { //nolint:gocognit
worker.StateDestroyed,
worker.StateInactive,
worker.StateStopped,
- worker.StateStopping:
+ worker.StateStopping,
+ worker.StateKilling:
continue
}
@@ -132,23 +133,40 @@ func (sp *supervised) control() { //nolint:gocognit
}
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
- // SOFT termination. DO NOT STOP active workers
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
+ // just to double check
+ workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
- // SOFT termination. DO NOT STOP active workers
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
-
- // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done
+ // just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
@@ -190,11 +208,20 @@ func (sp *supervised) control() { //nolint:gocognit
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
-
+ // just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 513d369f..1cd301ba 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -9,7 +9,9 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
var cfgSupervised = Config{
@@ -122,6 +124,58 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
assert.NotEqual(t, pid, p.Workers()[0].Pid())
}
+func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 5 * time.Second,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, string(resp.Body), "hello world")
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second)
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
+ pid = p.Workers()[0].Pid()
+
+ resp, err = p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, string(resp.Body), "hello world")
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
+
+ p.Destroy(context.Background())
+}
+
func TestSupervisedPool_Idle(t *testing.T) {
var cfgExecTTL = Config{
NumWorkers: uint64(1),
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 84ff5977..02f11d0b 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -60,6 +60,13 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, err)
}
+ // supervisor may set state of the worker during the work
+ // in this case we should not re-write the worker state
+ if tw.process.State().Value() != StateWorking {
+ tw.process.State().RegisterExec()
+ return rsp, nil
+ }
+
tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
index b9150c43..24b5fa6d 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/vec.go
@@ -13,7 +13,7 @@ type Vec struct {
workers chan worker.BaseProcess
}
-func NewVector(initialNumOfWorkers uint64) Vector {
+func NewVector(initialNumOfWorkers uint64) *Vec {
vec := &Vec{
destroy: 0,
workers: make(chan worker.BaseProcess, initialNumOfWorkers),
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
deleted file mode 100644
index 29fa3640..00000000
--- a/pkg/worker_watcher/interface.go
+++ /dev/null
@@ -1,31 +0,0 @@
-package worker_watcher //nolint:stylecheck
-
-import (
- "context"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-// Watcher is an interface for the Sync workers lifecycle
-type Watcher interface {
- // Watch used to add workers to the container
- Watch(workers []worker.BaseProcess) error
-
- // Get provide first free worker
- Get(ctx context.Context) (worker.BaseProcess, error)
-
- // Push enqueues worker back
- Push(w worker.BaseProcess)
-
- // Allocate - allocates new worker and put it into the WorkerWatcher
- Allocate() error
-
- // Destroy destroys the underlying container
- Destroy(ctx context.Context)
-
- // List return all container w/o removing it from internal storage
- List() []worker.BaseProcess
-
- // Remove will remove worker from the container
- Remove(wb worker.BaseProcess)
-}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index f82de958..b2d61d48 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -11,8 +11,18 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
+// Vector interface represents vector container
+type Vector interface {
+ // Enqueue used to put worker to the vector
+ Enqueue(worker.BaseProcess)
+ // Dequeue used to get worker from the vector
+ Dequeue(ctx context.Context) (worker.BaseProcess, error)
+ // Destroy used to stop releasing the workers
+ Destroy()
+}
+
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
numWorkers: numWorkers,
@@ -26,7 +36,7 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events
type workerWatcher struct {
sync.RWMutex
- container container.Vector
+ container Vector
// used to control the Destroy stage (that all workers are in the container)
numWorkers uint64
workers []worker.BaseProcess
@@ -150,11 +160,12 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
// Push O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
- if w.State().Value() != worker.StateReady {
+ switch w.State().Value() {
+ case worker.StateReady:
+ ww.container.Enqueue(w)
+ default:
_ = w.Kill()
- return
}
- ww.container.Enqueue(w)
}
// Destroy all underlying container (but let them to complete the task)
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index ca5f2f59..1115bd10 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -28,6 +28,9 @@ import (
const (
PluginName string = "websockets"
+
+ RrMode string = "RR_MODE"
+ RrBroadcastPath string = "RR_BROADCAST_PATH"
)
type Plugin struct {
@@ -113,7 +116,7 @@ func (p *Plugin) Serve() chan error {
AllocateTimeout: p.cfg.Pool.AllocateTimeout,
DestroyTimeout: p.cfg.Pool.DestroyTimeout,
Supervisor: p.cfg.Pool.Supervisor,
- }, map[string]string{"RR_MODE": "http"})
+ }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path})
if err != nil {
errCh <- err
}
@@ -176,7 +179,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
val, err := p.accessValidator(r)
p.RUnlock()
if err != nil {
- p.log.Error("validation error")
+ p.log.Error("access validation")
w.WriteHeader(400)
return
}
@@ -280,7 +283,7 @@ func (p *Plugin) Reset() error {
AllocateTimeout: p.cfg.Pool.AllocateTimeout,
DestroyTimeout: p.cfg.Pool.DestroyTimeout,
Supervisor: p.cfg.Pool.Supervisor,
- }, map[string]string{"RR_MODE": "http"})
+ }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path})
if err != nil {
return errors.E(op, err)
}
diff --git a/tests/sleep-ttl.php b/tests/sleep-ttl.php
new file mode 100644
index 00000000..2230e615
--- /dev/null
+++ b/tests/sleep-ttl.php
@@ -0,0 +1,15 @@
+<?php
+
+declare(strict_types=1);
+
+use Spiral\Goridge\StreamRelay;
+use Spiral\RoadRunner\Worker as RoadRunner;
+
+require __DIR__ . "/vendor/autoload.php";
+
+$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
+
+while($rr->waitPayload()){
+ sleep(10);
+ $rr->respond(new \Spiral\RoadRunner\Payload("hello world"));
+}