summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/pool')
-rw-r--r--pkg/pool/config.go16
-rw-r--r--pkg/pool/interface.go29
-rwxr-xr-xpkg/pool/static_pool.go111
-rwxr-xr-xpkg/pool/static_pool_test.go86
-rwxr-xr-xpkg/pool/supervisor_pool.go23
-rw-r--r--pkg/pool/supervisor_test.go94
6 files changed, 258 insertions, 101 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index e3e2d3cd..782f7ce9 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -12,12 +12,12 @@ type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
- NumWorkers int64 `mapstructure:"num_workers"`
+ NumWorkers uint64 `mapstructure:"num_workers"`
// MaxJobs defines how many executions is allowed for the worker until
// it's destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
- MaxJobs int64 `mapstructure:"max_jobs"`
+ MaxJobs uint64 `mapstructure:"max_jobs"`
// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task. Defaults to 60s.
@@ -34,7 +34,7 @@ type Config struct {
// InitDefaults enables default config values.
func (cfg *Config) InitDefaults() {
if cfg.NumWorkers == 0 {
- cfg.NumWorkers = int64(runtime.NumCPU())
+ cfg.NumWorkers = uint64(runtime.NumCPU())
}
if cfg.AllocateTimeout == 0 {
@@ -52,16 +52,16 @@ func (cfg *Config) InitDefaults() {
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
- WatchTick uint64 `mapstructure:"watch_tick"`
+ WatchTick time.Duration `mapstructure:"watch_tick"`
// TTL defines maximum time worker is allowed to live.
- TTL uint64 `mapstructure:"ttl"`
+ TTL time.Duration `mapstructure:"ttl"`
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL uint64 `mapstructure:"idle_ttl"`
+ IdleTTL time.Duration `mapstructure:"idle_ttl"`
// ExecTTL defines maximum lifetime per job.
- ExecTTL uint64 `mapstructure:"exec_ttl"`
+ ExecTTL time.Duration `mapstructure:"exec_ttl"`
// MaxWorkerMemory limits memory per worker.
MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
@@ -70,6 +70,6 @@ type SupervisorConfig struct {
// InitDefaults enables default config values.
func (cfg *SupervisorConfig) InitDefaults() {
if cfg.WatchTick == 0 {
- cfg.WatchTick = 1
+ cfg.WatchTick = time.Second
}
}
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
new file mode 100644
index 00000000..4f7ae595
--- /dev/null
+++ b/pkg/pool/interface.go
@@ -0,0 +1,29 @@
+package pool
+
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+// Pool managed set of inner worker processes.
+type Pool interface {
+ // GetConfig returns pool configuration.
+ GetConfig() interface{}
+
+ // Exec executes task with payload
+ Exec(rqs payload.Payload) (payload.Payload, error)
+
+ // ExecWithContext executes task with context which is used with timeout
+ ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
+
+ // Workers returns worker list associated with the pool.
+ Workers() (workers []worker.SyncWorker)
+
+ // Remove worker from the pool.
+ RemoveWorker(worker worker.SyncWorker) error
+
+ // Destroy all underlying stack (but let them to complete the task).
+ Destroy(ctx context.Context)
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 438f936f..44adf9c0 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -6,13 +6,11 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/transport"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
)
@@ -20,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error)
type Options func(p *StaticPool)
@@ -34,7 +32,7 @@ type StaticPool struct {
cmd Command
// creates and connects to stack
- factory worker.Factory
+ factory transport.Factory
// distributes the events
events events.Handler
@@ -43,7 +41,7 @@ type StaticPool struct {
listeners []events.Listener
// manages worker states and TTLs
- ww worker.Watcher
+ ww workerWatcher.Watcher
// allocate new worker
allocator worker.Allocator
@@ -53,7 +51,7 @@ type StaticPool struct {
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) {
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
const op = errors.Op("static_pool_initialize")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
@@ -69,7 +67,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
cfg: cfg,
cmd: cmd,
factory: factory,
- events: eventsPkg.NewEventsHandler(),
+ events: events.NewEventsHandler(),
}
// add pool options
@@ -78,7 +76,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
}
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
- p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
@@ -124,16 +122,17 @@ func (sp *StaticPool) GetConfig() interface{} {
}
// Workers returns worker list associated with the pool.
-func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
+func (sp *StaticPool) Workers() (workers []worker.SyncWorker) {
return sp.ww.WorkersList()
}
-func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
+func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
return sp.ww.RemoveWorker(wb)
}
+// Be careful, sync Exec with ExecWithContext
func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("exec")
+ const op = errors.Op("static_pool_exec")
if sp.cfg.Debug {
return sp.execDebug(p)
}
@@ -152,28 +151,21 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// worker want's to be terminated
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
- w.State().Set(internal.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
- }
+ sp.stopWorker(w)
return sp.Exec(p)
}
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew()
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
- }
- } else {
- sp.ww.PushWorker(w)
+ err = sp.checkMaxJobs(w)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
}
return rsp, nil
}
-func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
+// Be careful, sync with pool.Exec method
+func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
defer cancel()
@@ -182,32 +174,46 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return payload.Payload{}, errors.E(op, err)
}
- rsp, err := w.ExecWithTimeout(ctx, rqs)
+ rsp, err := w.ExecWithTimeout(ctx, p)
if err != nil {
return sp.errEncoder(err, w)
}
// worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- w.State().Set(internal.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
- }
+ if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ sp.stopWorker(w)
+ return sp.ExecWithContext(ctx, p)
+ }
+
+ err = sp.checkMaxJobs(w)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
- return sp.ExecWithContext(ctx, rqs)
+ return rsp, nil
+}
+
+func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
+ const op = errors.Op("static_pool_stop_worker")
+ w.State().Set(internal.StateInvalid)
+ err := w.Stop()
+ if err != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
+}
+// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
+func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
+ const op = errors.Op("static_pool_check_max_jobs")
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew()
+ err := sp.ww.AllocateNew()
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return errors.E(op, err)
}
} else {
sp.ww.PushWorker(w)
}
-
- return rsp, nil
+ return nil
}
func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
@@ -222,7 +228,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
// else if err not nil - return error
return nil, errors.E(op, err)
}
- return w.(worker.SyncWorker), nil
+ return w, nil
}
// Destroy all underlying stack (but let them to complete the task).
@@ -231,7 +237,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (payload.Payload, error) {
+ return func(err error, w worker.SyncWorker) (payload.Payload, error) {
const op = errors.Op("error encoder")
// just push event if on any stage was timeout error
if errors.Is(errors.ExecTTL, err) {
@@ -269,8 +275,8 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
}
-func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
- return func() (worker.BaseProcess, error) {
+func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
+ return func() (*worker.SyncWorkerImpl, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
@@ -278,10 +284,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return nil, err
}
- sw, err := syncWorker.From(w)
- if err != nil {
- return nil, err
- }
+ sw := worker.From(w)
sp.events.Push(events.PoolEvent{
Event: events.EventWorkerConstruct,
@@ -297,7 +300,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, err
}
- r, err := sw.(worker.SyncWorker).Exec(p)
+ r, err := sw.Exec(p)
if stopErr := sw.Stop(); stopErr != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
@@ -307,22 +310,18 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) {
const op = errors.Op("allocate workers")
- var workers []worker.BaseProcess
+ var workers []worker.SyncWorker
// constant number of stack simplify logic
- for i := int64(0); i < numWorkers; i++ {
+ for i := uint64(0); i < numWorkers; i++ {
w, err := sp.allocator()
if err != nil {
return nil, errors.E(op, errors.WorkerAllocate, err)
}
- sw, err := syncWorker.From(w)
- if err != nil {
- return nil, errors.E(op, err)
- }
- workers = append(workers, sw)
+ workers = append(workers, w)
}
return workers, nil
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index f66895dc..a32790e0 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -12,15 +12,15 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/stretchr/testify/assert"
)
var cfg = Config{
- NumWorkers: int64(runtime.NumCPU()),
+ NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
}
@@ -489,6 +489,84 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
p.Destroy(context.Background())
}
+func Test_StaticPool_NoFreeWorkers(t *testing.T) {
+ ctx := context.Background()
+ block := make(chan struct{}, 1)
+
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventNoFreeWorkers {
+ block <- struct{}{}
+ }
+ }
+ }
+
+ p, err := Initialize(
+ ctx,
+ // sleep for the 3 seconds
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ Debug: false,
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: nil,
+ },
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ go func() {
+ _, _ = p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ }()
+
+ time.Sleep(time.Second)
+ res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+
+ <-block
+
+ p.Destroy(ctx)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_WrongCommand1(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Error(t, err)
+ assert.Nil(t, p)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_WrongCommand2(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Error(t, err)
+ assert.Nil(t, p)
+}
+
func Benchmark_Pool_Echo(b *testing.B) {
ctx := context.Background()
p, err := Initialize(
@@ -518,7 +596,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
Config{
- NumWorkers: int64(runtime.NumCPU()),
+ NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
},
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 19cda759..2597b352 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -6,11 +6,10 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/tools"
)
@@ -20,7 +19,7 @@ const MB = 1024 * 1024
const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
type Supervised interface {
- pool.Pool
+ Pool
// Start used to start watching process for all pool workers
Start()
}
@@ -28,12 +27,12 @@ type Supervised interface {
type supervised struct {
cfg *SupervisorConfig
events events.Handler
- pool pool.Pool
+ pool Pool
stopCh chan struct{}
mu *sync.RWMutex
}
-func supervisorWrapper(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
sp := &supervised{
cfg: cfg,
events: events,
@@ -57,7 +56,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
c := make(chan ttlExec, 1)
- ctx, cancel := context.WithTimeout(ctx, time.Duration(sp.cfg.ExecTTL)*time.Second)
+ ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL)
defer cancel()
go func() {
res, err := sp.pool.ExecWithContext(ctx, rqs)
@@ -101,13 +100,13 @@ func (sp *supervised) GetConfig() interface{} {
return sp.pool.GetConfig()
}
-func (sp *supervised) Workers() (workers []worker.BaseProcess) {
+func (sp *supervised) Workers() (workers []worker.SyncWorker) {
sp.mu.Lock()
defer sp.mu.Unlock()
return sp.pool.Workers()
}
-func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
+func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error {
return sp.pool.RemoveWorker(worker)
}
@@ -117,7 +116,7 @@ func (sp *supervised) Destroy(ctx context.Context) {
func (sp *supervised) Start() {
go func() {
- watchTout := time.NewTicker(time.Duration(sp.cfg.WatchTick) * time.Second)
+ watchTout := time.NewTicker(sp.cfg.WatchTick)
for {
select {
case <-sp.stopCh:
@@ -155,7 +154,7 @@ func (sp *supervised) control() {
continue
}
- if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
@@ -210,7 +209,7 @@ func (sp *supervised) control() {
// IdleTTL is 1 second.
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
- if int64(sp.cfg.IdleTTL)-res <= 0 {
+ if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index b3358965..c67d5d91 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -6,21 +6,22 @@ import (
"testing"
"time"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/spiral/roadrunner/v2/tools"
"github.com/stretchr/testify/assert"
)
var cfgSupervised = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 100,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 100 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -73,14 +74,14 @@ func TestSupervisedPool_Exec(t *testing.T) {
func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 1,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 1 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -114,14 +115,14 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
func TestSupervisedPool_Idle(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 1,
- ExecTTL: 100,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 1 * time.Second,
+ ExecTTL: 100 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -155,14 +156,14 @@ func TestSupervisedPool_Idle(t *testing.T) {
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 4,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -194,3 +195,54 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
// should be the same pid
assert.Equal(t, pid, p.Workers()[0].Pid())
}
+
+func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
+ MaxWorkerMemory: 1,
+ },
+ }
+
+ block := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventMaxMemory {
+ block <- struct{}{}
+ }
+ }
+ }
+
+ // constructed
+ // max memory
+ // constructed
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ AddListeners(listener),
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ <-block
+ p.Destroy(context.Background())
+}