summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-26 14:08:33 +0300
committerValery Piashchynski <[email protected]>2021-06-26 14:08:33 +0300
commit53e50a05bd27ecec03695b69defd920fc4a25c5c (patch)
treee86ca391e5a85118098c6340a0f0ae86747db042 /pkg/pool
parentad1ca84b26bb6a4ba410a8a684fe3d2e2f86eaea (diff)
parentfc540f6029772ff51913b8ee3c082f8197010c52 (diff)
Merge remote-tracking branch 'origin/master' into feature/jobs_plugin
Diffstat (limited to 'pkg/pool')
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/supervisor_pool.go105
-rw-r--r--pkg/pool/supervisor_test.go48
3 files changed, 81 insertions, 74 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 74e06b81..1c149c51 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -174,7 +174,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
return sp.execDebugWithTTL(ctx, p)
}
- ctxAlloc, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
+ ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
w, err := sp.getWorker(ctxAlloc, op)
if err != nil {
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index ca61dbc4..b09b6f6c 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -43,47 +43,8 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig)
return sp
}
-type ttlExec struct {
- err error
- p payload.Payload
-}
-
-func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
- const op = errors.Op("supervised_exec_with_context")
- if sp.cfg.ExecTTL == 0 {
- return sp.pool.Exec(rqs)
- }
-
- c := make(chan ttlExec, 1)
- ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL)
- defer cancel()
- go func() {
- res, err := sp.pool.execWithTTL(ctx, rqs)
- if err != nil {
- c <- ttlExec{
- err: errors.E(op, err),
- p: payload.Payload{},
- }
- }
-
- c <- ttlExec{
- err: nil,
- p: res,
- }
- }()
-
- for {
- select {
- case <-ctx.Done():
- return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
- case res := <-c:
- if res.err != nil {
- return payload.Payload{}, res.err
- }
-
- return res.p, nil
- }
- }
+func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) {
+ panic("used to satisfy pool interface")
}
func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
@@ -92,36 +53,15 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
return sp.pool.Exec(rqs)
}
- c := make(chan ttlExec, 1)
ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
defer cancel()
- go func() {
- res, err := sp.pool.execWithTTL(ctx, rqs)
- if err != nil {
- c <- ttlExec{
- err: errors.E(op, err),
- p: payload.Payload{},
- }
- }
-
- c <- ttlExec{
- err: nil,
- p: res,
- }
- }()
-
- for {
- select {
- case <-ctx.Done():
- return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
- case res := <-c:
- if res.err != nil {
- return payload.Payload{}, res.err
- }
- return res.p, nil
- }
+ res, err := sp.pool.execWithTTL(ctx, rqs)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
}
+
+ return res, nil
}
func (sp *supervised) GetConfig() interface{} {
@@ -164,7 +104,7 @@ func (sp *supervised) Stop() {
sp.stopCh <- struct{}{}
}
-func (sp *supervised) control() {
+func (sp *supervised) control() { //nolint:gocognit
now := time.Now()
// MIGHT BE OUTDATED
@@ -172,7 +112,16 @@ func (sp *supervised) control() {
workers := sp.pool.Workers()
for i := 0; i < len(workers); i++ {
- if workers[i].State().Value() == worker.StateInvalid {
+ // if worker not in the Ready OR working state
+ // skip such worker
+ switch workers[i].State().Value() {
+ case
+ worker.StateInvalid,
+ worker.StateErrored,
+ worker.StateDestroyed,
+ worker.StateInactive,
+ worker.StateStopped,
+ worker.StateStopping:
continue
}
@@ -183,12 +132,23 @@ func (sp *supervised) control() {
}
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
- workers[i].State().Set(worker.StateInvalid)
+ // SOFT termination. DO NOT STOP active workers
+ if workers[i].State().Value() != worker.StateWorking {
+ workers[i].State().Set(worker.StateInvalid)
+ _ = workers[i].Stop()
+ }
sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
+ // SOFT termination. DO NOT STOP active workers
+ if workers[i].State().Value() != worker.StateWorking {
+ workers[i].State().Set(worker.StateInvalid)
+ _ = workers[i].Stop()
+ }
+
+ // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
@@ -230,6 +190,11 @@ func (sp *supervised) control() {
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
+ if workers[i].State().Value() != worker.StateWorking {
+ workers[i].State().Set(worker.StateInvalid)
+ _ = workers[i].Stop()
+ }
+
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 348622c7..06cbe904 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -108,7 +108,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.execWithTTL(context.Background(), payload.Payload{
+ resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -148,7 +148,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.execWithTTL(context.Background(), payload.Payload{
+ resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -160,7 +160,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
time.Sleep(time.Second * 5)
// worker should be marked as invalid and reallocated
- _, err = p.execWithTTL(context.Background(), payload.Payload{
+ _, err = p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -170,6 +170,48 @@ func TestSupervisedPool_Idle(t *testing.T) {
p.Destroy(context.Background())
}
+func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 1 * time.Second,
+ IdleTTL: 1 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ time.Sleep(time.Millisecond * 100)
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 2)
+ // should be destroyed, state should be Ready, not Invalid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ assert.Equal(t, int64(1), p.Workers()[0].State().Value())
+}
+
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
var cfgExecTTL = &Config{
NumWorkers: uint64(1),