summaryrefslogtreecommitdiff
path: root/pool
diff options
context:
space:
mode:
Diffstat (limited to 'pool')
-rwxr-xr-xpool/static_pool.go61
-rwxr-xr-xpool/static_pool_test.go84
-rwxr-xr-xpool/supervisor_pool.go17
-rw-r--r--pool/supervisor_test.go29
4 files changed, 83 insertions, 108 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 7481f84f..019c34b2 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -2,7 +2,6 @@ package pool
import (
"context"
- "fmt"
"os/exec"
"time"
@@ -13,13 +12,12 @@ import (
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher"
+ "go.uber.org/zap"
)
const (
// StopRequest can be sent by worker to indicate that restart is required.
StopRequest = `{"stop":true}`
- // pluginName ...
- pluginName = "pool"
)
// ErrorEncoder encode error or make a decision based on the error type
@@ -32,6 +30,7 @@ type Command func() *exec.Cmd
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
cfg *Config
+ log *zap.Logger
// worker command creator
cmd Command
@@ -39,9 +38,6 @@ type StaticPool struct {
// creates and connects to stack
factory transport.Factory
- events events.EventBus
- eventsID string
-
// manages worker states and TTLs
ww Watcher
@@ -52,8 +48,8 @@ type StaticPool struct {
errEncoder ErrorEncoder
}
-// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
+// NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
+func NewStaticPool(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
if factory == nil {
return nil, errors.Str("no factory initialized")
}
@@ -64,13 +60,10 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
cfg.MaxJobs = 1
}
- eb, id := events.Bus()
p := &StaticPool{
- cfg: cfg,
- cmd: cmd,
- factory: factory,
- events: eb,
- eventsID: id,
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
}
// add pool options
@@ -78,10 +71,19 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
options[i](p)
}
+ if p.log == nil {
+ z, err := zap.NewProduction()
+ if err != nil {
+ return nil, err
+ }
+
+ p.log = z
+ }
+
// set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
// set up workers watcher
- p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.log, p.cfg.NumWorkers, p.cfg.AllocateTimeout)
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
@@ -99,7 +101,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
- sp := supervisorWrapper(p, eb, p.cfg.Supervisor)
+ sp := supervisorWrapper(p, p.log, p.cfg.Supervisor)
// start watcher timer
sp.Start()
return sp, nil
@@ -108,6 +110,12 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
return p, nil
}
+func WithLogger(z *zap.Logger) Options {
+ return func(p *StaticPool) {
+ p.log = z
+ }
+}
+
// GetConfig returns associated pool configuration. Immutable.
func (sp *StaticPool) GetConfig() interface{} {
return sp.cfg
@@ -158,7 +166,6 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
- sp.events.Unsubscribe(sp.eventsID)
sp.ww.Destroy(ctx)
}
@@ -183,13 +190,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err)))
+ sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err))
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
+ sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "worker error"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
// mark old as invalid and stop
@@ -209,15 +215,14 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
+ sp.log.Warn("network error, worker will be restarted", zap.String("reason", "network"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
// kill the worker instead of sending net packet to it
_ = w.Kill()
return nil, err
default:
w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+ sp.log.Warn("worker will be restarted", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err))
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
@@ -268,7 +273,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, worker's pid: %d", err.Error(), w.Pid())))
+ sp.log.Warn("user requested worker to be stopped", zap.String("reason", "user event"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
}
}
@@ -289,7 +294,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("no free workers in the pool, error: %s", err)))
+ sp.log.Error("no free workers in the pool, wait timeout exceed", zap.String("reason", "no free workers"), zap.String("internal_event_name", events.EventNoFreeWorkers.String()), zap.Error(err))
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -310,7 +315,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// wrap sync worker
sw := worker.From(w)
- sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("worker allocated, pid: %d", sw.Pid())))
+ sp.log.Debug("worker is allocated", zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String()))
return sw, nil
}
}
@@ -336,7 +341,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
// destroy the worker
err = sw.Stop()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid())))
+ sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
return nil, err
}
@@ -363,7 +368,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
err = sw.Stop()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid())))
+ sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
return nil, err
}
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index a45aa29d..4f98ca91 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -6,19 +6,18 @@ import (
"os/exec"
"runtime"
"strconv"
- "strings"
"sync"
"testing"
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/transport/pipe"
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "go.uber.org/zap"
)
var cfg = &Config{
@@ -29,7 +28,7 @@ var cfg = &Config{
func Test_NewPool(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -44,7 +43,7 @@ func Test_NewPool(t *testing.T) {
func Test_NewPoolReset(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -71,7 +70,7 @@ func Test_NewPoolReset(t *testing.T) {
}
func Test_StaticPool_Invalid(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/invalid.php") },
pipe.NewPipeFactory(),
@@ -83,7 +82,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
}
func Test_ConfigNoErrorInitDefaults(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -100,7 +99,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
func Test_StaticPool_Echo(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -124,7 +123,7 @@ func Test_StaticPool_Echo(t *testing.T) {
func Test_StaticPool_Echo_NilContext(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -148,7 +147,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
func Test_StaticPool_Echo_Context(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "head", "pipes") },
pipe.NewPipeFactory(),
@@ -172,7 +171,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
func Test_StaticPool_JobError(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
@@ -198,17 +197,15 @@ func Test_StaticPool_JobError(t *testing.T) {
func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ z, err := zap.NewProduction()
require.NoError(t, err)
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") },
pipe.NewPipeFactory(),
cfg,
+ WithLogger(z),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -218,31 +215,19 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- event := <-ch
- if !strings.Contains(event.Message(), "undefined_function()") {
- t.Fatal("event should contain undefiled function()")
- }
-
p.Destroy(ctx)
}
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
- // Run pool events
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch)
- require.NoError(t, err)
-
var cfg2 = &Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
}
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -264,7 +249,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Equal(t, 1, len(p.Workers()))
// first creation
- <-ch
+ time.Sleep(time.Second * 2)
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill()
if err != nil {
@@ -272,8 +257,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
// re-creation
- <-ch
-
+ time.Sleep(time.Second * 2)
list := p.Workers()
for _, w := range list {
assert.Equal(t, worker.StateReady, w.State().Value())
@@ -281,7 +265,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
func Test_StaticPool_AllocateTimeout(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
@@ -300,7 +284,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
@@ -339,7 +323,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
func Test_StaticPool_Debug_Worker(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
@@ -381,7 +365,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
// identical to replace but controlled on worker side
func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "stop", "pipes") },
pipe.NewPipeFactory(),
@@ -422,7 +406,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
@@ -444,7 +428,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
@@ -474,7 +458,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Handle_Dead(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -499,7 +483,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Slow_Destroy(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -519,13 +503,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx := context.Background()
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch)
- require.NoError(t, err)
-
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
// sleep for the 3 seconds
func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") },
@@ -550,14 +528,14 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-ch
+ time.Sleep(time.Second)
p.Destroy(ctx)
}
// identical to replace but controlled on worker side
func Test_Static_Pool_WrongCommand1(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("phg", "../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -574,7 +552,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_WrongCommand2(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -591,7 +569,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
func Test_CRC_WithPayload(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") },
pipe.NewPipeFactory(),
@@ -623,7 +601,7 @@ Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allo
*/
func Benchmark_Pool_Echo(b *testing.B) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -655,7 +633,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op
func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -697,7 +675,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
// Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op
func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 0502dc9a..59834859 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -2,7 +2,6 @@ package pool
import (
"context"
- "fmt"
"sync"
"time"
@@ -11,11 +10,11 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/state/process"
"github.com/spiral/roadrunner/v2/worker"
+ "go.uber.org/zap"
)
const (
- MB = 1024 * 1024
- supervisorName string = "supervisor"
+ MB = 1024 * 1024
)
// NSEC_IN_SEC nanoseconds in second
@@ -29,17 +28,17 @@ type Supervised interface {
type supervised struct {
cfg *SupervisorConfig
- events events.EventBus
pool Pool
+ log *zap.Logger
stopCh chan struct{}
mu *sync.RWMutex
}
-func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised {
+func supervisorWrapper(pool Pool, log *zap.Logger, cfg *SupervisorConfig) *supervised {
sp := &supervised{
cfg: cfg,
- events: eb,
pool: pool,
+ log: log,
mu: &sync.RWMutex{},
stopCh: make(chan struct{}),
}
@@ -166,7 +165,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("ttl reached, worker's pid: %d", workers[i].Pid())))
+ sp.log.Debug("ttl", zap.String("reason", "ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String()))
continue
}
@@ -186,7 +185,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("max memory reached, worker's pid: %d", workers[i].Pid())))
+ sp.log.Debug("memory_limit", zap.String("reason", "max memory is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventMaxMemory.String()))
continue
}
@@ -241,7 +240,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double-check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("idle ttl reached, worker's pid: %d", workers[i].Pid())))
+ sp.log.Debug("idle_ttl", zap.String("reason", "idle ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String()))
}
}
}
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index 6e8ab552..6ff62316 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -7,7 +7,6 @@ import (
"testing"
"time"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/transport/pipe"
"github.com/spiral/roadrunner/v2/worker"
@@ -30,7 +29,7 @@ var cfgSupervised = &Config{
func TestSupervisedPool_Exec(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
pipe.NewPipeFactory(),
@@ -60,7 +59,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
func Test_SupervisedPoolReset(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -91,7 +90,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
cfgSupervised.Debug = true
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") },
pipe.NewPipeFactory(),
@@ -129,7 +128,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
},
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") },
pipe.NewPipeFactory(),
@@ -164,7 +163,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
},
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") },
pipe.NewPipeFactory(),
@@ -221,7 +220,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
},
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") },
pipe.NewPipeFactory(),
@@ -271,7 +270,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
},
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") },
pipe.NewPipeFactory(),
@@ -319,7 +318,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
},
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") },
pipe.NewPipeFactory(),
@@ -361,17 +360,11 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
},
}
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch)
- require.NoError(t, err)
-
// constructed
// max memory
// constructed
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
pipe.NewPipeFactory(),
@@ -390,7 +383,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
assert.Empty(t, resp.Body)
assert.Empty(t, resp.Context)
- <-ch
+ time.Sleep(time.Second)
p.Destroy(context.Background())
}
@@ -406,7 +399,7 @@ func TestSupervisedPool_AllocateFailedOK(t *testing.T) {
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") },
pipe.NewPipeFactory(),