summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-23 17:54:58 +0300
committerValery Piashchynski <[email protected]>2021-01-23 17:54:58 +0300
commit01a6fab935fc2e40d7b6c17ab75a20a74ca23d1d (patch)
tree59967433eb8220e60020011873623a7454941314 /pkg
parent29d6020a9e8a3713b22269ed946547c96c24d3da (diff)
Stabilization PR
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pool/static_pool.go64
-rwxr-xr-xpkg/pool/static_pool_test.go78
-rw-r--r--pkg/pool/supervisor_test.go45
3 files changed, 159 insertions, 28 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 438f936f..d1b726c1 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -132,8 +132,9 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return sp.ww.RemoveWorker(wb)
}
+// Be careful, sync Exec with ExecWithContext
func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("exec")
+ const op = errors.Op("static_pool_exec")
if sp.cfg.Debug {
return sp.execDebug(p)
}
@@ -152,28 +153,21 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// worker want's to be terminated
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
- w.State().Set(internal.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
- }
+ sp.stopWorker(&w)
return sp.Exec(p)
}
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew()
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
- }
- } else {
- sp.ww.PushWorker(w)
+ err = sp.checkMaxJobs(&w)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
}
return rsp, nil
}
-func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
+// Be careful, sync with pool.Exec method
+func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
defer cancel()
@@ -182,32 +176,46 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return payload.Payload{}, errors.E(op, err)
}
- rsp, err := w.ExecWithTimeout(ctx, rqs)
+ rsp, err := w.ExecWithTimeout(ctx, p)
if err != nil {
return sp.errEncoder(err, w)
}
// worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- w.State().Set(internal.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
- }
+ if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ sp.stopWorker(&w)
+ return sp.ExecWithContext(ctx, p)
+ }
- return sp.ExecWithContext(ctx, rqs)
+ err = sp.checkMaxJobs(&w)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
}
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew()
+ return rsp, nil
+}
+
+func (sp *StaticPool) stopWorker(w *worker.SyncWorker) {
+ const op = errors.Op("static_pool_stop_worker")
+ (*w).State().Set(internal.StateInvalid)
+ err := (*w).Stop()
+ if err != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: *w, Payload: errors.E(op, err)})
+ }
+}
+
+// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
+func (sp *StaticPool) checkMaxJobs(w *worker.SyncWorker) error {
+ const op = errors.Op("static_pool_check_max_jobs")
+ if sp.cfg.MaxJobs != 0 && (*w).State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew()
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return errors.E(op, err)
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.PushWorker(*w)
}
-
- return rsp, nil
+ return nil
}
func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index f66895dc..348f5297 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -489,6 +489,84 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
p.Destroy(context.Background())
}
+func Test_StaticPool_NoFreeWorkers(t *testing.T) {
+ ctx := context.Background()
+ block := make(chan struct{}, 1)
+
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventNoFreeWorkers {
+ block <- struct{}{}
+ }
+ }
+ }
+
+ p, err := Initialize(
+ ctx,
+ // sleep for the 3 seconds
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ Debug: false,
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: nil,
+ },
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ go func() {
+ _, _ = p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ }()
+
+ time.Sleep(time.Second)
+ res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+
+ <-block
+
+ p.Destroy(ctx)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_WrongCommand1(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Error(t, err)
+ assert.Nil(t, p)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_WrongCommand2(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Error(t, err)
+ assert.Nil(t, p)
+}
+
func Benchmark_Pool_Echo(b *testing.B) {
ctx := context.Background()
p, err := Initialize(
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index b3358965..a9424cd5 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -194,3 +194,48 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
// should be the same pid
assert.Equal(t, pid, p.Workers()[0].Pid())
}
+
+func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 100,
+ ExecTTL: 4,
+ MaxWorkerMemory: 1,
+ },
+ }
+
+ // constructed
+ // max memory
+ // constructed
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ time.Sleep(time.Millisecond * 100)
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 2)
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+}