summaryrefslogtreecommitdiff
path: root/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
committerValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
commit9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (patch)
tree8fa981011ffb2f4bd9ca685b4935b5c35d7d368f /pool
parent160055c16d4c1ca1e0e19853cbb89ef3509c7556 (diff)
Events package update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pool')
-rwxr-xr-xpool/static_pool.go102
-rwxr-xr-xpool/static_pool_test.go58
-rwxr-xr-xpool/supervisor_pool.go49
-rw-r--r--pool/supervisor_test.go15
4 files changed, 124 insertions, 100 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 91bd1c2c..27db830c 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -2,6 +2,7 @@ package pool
import (
"context"
+ "fmt"
"os/exec"
"time"
@@ -14,8 +15,12 @@ import (
workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher"
)
-// StopRequest can be sent by worker to indicate that restart is required.
-const StopRequest = "{\"stop\":true}"
+const (
+ // StopRequest can be sent by worker to indicate that restart is required.
+ StopRequest = `{"stop":true}`
+ // pluginName ...
+ pluginName = "pool"
+)
// ErrorEncoder encode error or make a decision based on the error type
type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error)
@@ -34,11 +39,8 @@ type StaticPool struct {
// creates and connects to stack
factory transport.Factory
- // distributes the events
- events events.Handler
-
- // saved list of event listeners
- listeners []events.Listener
+ events events.EventBus
+ eventsID string
// manages worker states and TTLs
ww Watcher
@@ -62,11 +64,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
cfg.MaxJobs = 1
}
+ eb, id := events.Bus()
p := &StaticPool{
- cfg: cfg,
- cmd: cmd,
- factory: factory,
- events: events.NewEventsHandler(),
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
+ events: eb,
+ eventsID: id,
}
// add pool options
@@ -77,7 +81,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
// set up workers watcher
- p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout)
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
@@ -95,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
- sp := supervisorWrapper(p, p.events, p.cfg.Supervisor)
+ sp := supervisorWrapper(p, p.cfg.Supervisor)
// start watcher timer
sp.Start()
return sp, nil
@@ -104,20 +108,6 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
return p, nil
}
-func AddListeners(listeners ...events.Listener) Options {
- return func(p *StaticPool) {
- p.listeners = listeners
- for i := 0; i < len(listeners); i++ {
- p.addListener(listeners[i])
- }
- }
-}
-
-// AddListener connects event listener to the pool.
-func (sp *StaticPool) addListener(listener events.Listener) {
- sp.events.AddListener(listener)
-}
-
// GetConfig returns associated pool configuration. Immutable.
func (sp *StaticPool) GetConfig() interface{} {
return sp.cfg
@@ -205,7 +195,11 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: pluginName,
+ M: fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()),
+ })
}
}
@@ -227,7 +221,11 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Error: errors.E(op, err)})
+ sp.events.Send(&events.RREvent{
+ T: events.EventNoFreeWorkers,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s", err),
+ })
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -238,6 +236,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.events.Unsubscribe(sp.eventsID)
sp.ww.Destroy(ctx)
}
@@ -246,12 +245,20 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventExecTTL,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s", err),
+ })
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
+ })
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -272,7 +279,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
+ })
// kill the worker instead of sending net packet to it
_ = w.Kill()
@@ -280,7 +291,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return nil, err
default:
w.State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerDestruct,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
+ })
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
@@ -296,7 +311,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return func() (worker.SyncWorker, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
- w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...)
+ w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd())
if err != nil {
return nil, err
}
@@ -304,9 +319,10 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// wrap sync worker
sw := worker.From(w)
- sp.events.Push(events.PoolEvent{
- Event: events.EventWorkerConstruct,
- Payload: sw,
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerConstruct,
+ P: pluginName,
+ M: fmt.Sprintf("pid: %d", sw.Pid()),
})
return sw, nil
}
@@ -329,7 +345,11 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
sw.State().Set(worker.StateDestroyed)
err = sw.Kill()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()),
+ })
return nil, err
}
@@ -346,7 +366,11 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// redirect call to the worker with TTL
r, err := sw.ExecWithTTL(ctx, p)
if stopErr := sw.Stop(); stopErr != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()),
+ })
}
return r, err
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index 9861f0d8..abef3779 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -18,6 +18,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
var cfg = &Config{
@@ -167,26 +168,17 @@ func Test_StaticPool_JobError(t *testing.T) {
func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
- block := make(chan struct{}, 10)
-
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerStderr {
- e := string(wev.Payload.([]byte))
- if strings.ContainsAny(e, "undefined_function()") {
- block <- struct{}{}
- return
- }
- }
- }
- }
+
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") },
pipe.NewPipeFactory(),
cfg,
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -196,22 +188,22 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+ event := <-ch
+ if !strings.Contains(event.Message(), "undefined_function()") {
+ t.Fatal("event should contain undefiled function()")
+ }
p.Destroy(ctx)
}
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
+
// Run pool events
- ev := make(chan struct{}, 1)
- listener := func(event interface{}) {
- if pe, ok := event.(events.PoolEvent); ok {
- if pe.Event == events.EventWorkerConstruct {
- ev <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch)
+ require.NoError(t, err)
var cfg2 = &Config{
NumWorkers: 1,
@@ -224,7 +216,6 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
cfg2,
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -242,7 +233,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Equal(t, 1, len(p.Workers()))
// first creation
- <-ev
+ <-ch
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill()
if err != nil {
@@ -250,7 +241,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
// re-creation
- <-ev
+ <-ch
list := p.Workers()
for _, w := range list {
@@ -496,15 +487,11 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx := context.Background()
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventNoFreeWorkers {
- block <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch)
+ require.NoError(t, err)
p, err := Initialize(
ctx,
@@ -518,7 +505,6 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
DestroyTimeout: time.Second,
Supervisor: nil,
},
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -532,7 +518,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+ <-ch
p.Destroy(ctx)
}
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 99af168c..c1fb6eec 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -2,6 +2,7 @@ package pool
import (
"context"
+ "fmt"
"sync"
"time"
@@ -12,7 +13,10 @@ import (
"github.com/spiral/roadrunner/v2/worker"
)
-const MB = 1024 * 1024
+const (
+ MB = 1024 * 1024
+ supervisorName string = "supervisor"
+)
// NSEC_IN_SEC nanoseconds in second
const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
@@ -24,20 +28,23 @@ type Supervised interface {
}
type supervised struct {
- cfg *SupervisorConfig
- events events.Handler
- pool Pool
- stopCh chan struct{}
- mu *sync.RWMutex
+ cfg *SupervisorConfig
+ events events.EventBus
+ eventsID string
+ pool Pool
+ stopCh chan struct{}
+ mu *sync.RWMutex
}
-func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised {
+ eb, id := events.Bus()
sp := &supervised{
- cfg: cfg,
- events: events,
- pool: pool,
- mu: &sync.RWMutex{},
- stopCh: make(chan struct{}),
+ cfg: cfg,
+ events: eb,
+ eventsID: id,
+ pool: pool,
+ mu: &sync.RWMutex{},
+ stopCh: make(chan struct{}),
}
return sp
@@ -148,7 +155,11 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
+ sp.events.Send(&events.RREvent{
+ T: events.EventTTL,
+ P: supervisorName,
+ M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
+ })
continue
}
@@ -168,7 +179,11 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
+ sp.events.Send(&events.RREvent{
+ T: events.EventMaxMemory,
+ P: supervisorName,
+ M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
+ })
continue
}
@@ -223,7 +238,11 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double-check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
+ sp.events.Send(&events.RREvent{
+ T: events.EventIdleTTL,
+ P: supervisorName,
+ M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
+ })
}
}
}
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index aca379c6..c3abf85e 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -326,14 +326,10 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
},
}
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventMaxMemory {
- block <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch)
+ require.NoError(t, err)
// constructed
// max memory
@@ -344,7 +340,6 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
pipe.NewPipeFactory(),
cfgExecTTL,
- AddListeners(listener),
)
assert.NoError(t, err)
@@ -359,7 +354,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
assert.Empty(t, resp.Body)
assert.Empty(t, resp.Context)
- <-block
+ <-ch
p.Destroy(context.Background())
}