diff options
author | Valery Piashchynski <[email protected]> | 2021-01-25 16:18:26 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-01-25 16:18:26 +0300 |
commit | f1875f5715bf7635e17697ae3513ba3d21e4e524 (patch) | |
tree | f9c0c3876ef542217a8bd7ff17f90bffc018132f /pkg/pool/supervisor_pool.go | |
parent | a063ad05b1cab8ec71eecc32f836efa4d431c6b8 (diff) | |
parent | 99bf203511b8af4be37186017e2e0c73a030d4f3 (diff) |
Merge pull request #429 from spiral/2.0
Release 2.0-dev
Diffstat (limited to 'pkg/pool/supervisor_pool.go')
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go new file mode 100755 index 00000000..2597b352 --- /dev/null +++ b/pkg/pool/supervisor_pool.go @@ -0,0 +1,222 @@ +package pool + +import ( + "context" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/tools" +) + +const MB = 1024 * 1024 + +// NSEC_IN_SEC nanoseconds in second +const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck + +type Supervised interface { + Pool + // Start used to start watching process for all pool workers + Start() +} + +type supervised struct { + cfg *SupervisorConfig + events events.Handler + pool Pool + stopCh chan struct{} + mu *sync.RWMutex +} + +func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { + sp := &supervised{ + cfg: cfg, + events: events, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), + } + + return sp +} + +type ttlExec struct { + err error + p payload.Payload +} + +func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { + const op = errors.Op("supervised_exec_with_context") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + c := make(chan ttlExec, 1) + ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) + defer cancel() + go func() { + res, err := sp.pool.ExecWithContext(ctx, rqs) + if err != nil { + c <- ttlExec{ + err: errors.E(op, err), + p: payload.Payload{}, + } + } + + c <- ttlExec{ + err: nil, + p: res, + } + }() + + for { + select { + case <-ctx.Done(): + return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) + case res := <-c: + if res.err != nil { + return payload.Payload{}, res.err + } + + return res.p, nil + } + } +} + +func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) { + const op = errors.Op("supervised_exec") + rsp, err := sp.pool.Exec(p) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + return rsp, nil +} + +func (sp *supervised) GetConfig() interface{} { + return sp.pool.GetConfig() +} + +func (sp *supervised) Workers() (workers []worker.SyncWorker) { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Workers() +} + +func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error { + return sp.pool.RemoveWorker(worker) +} + +func (sp *supervised) Destroy(ctx context.Context) { + sp.pool.Destroy(ctx) +} + +func (sp *supervised) Start() { + go func() { + watchTout := time.NewTicker(sp.cfg.WatchTick) + for { + select { + case <-sp.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + sp.mu.Lock() + sp.control() + sp.mu.Unlock() + } + } + }() +} + +func (sp *supervised) Stop() { + sp.stopCh <- struct{}{} +} + +func (sp *supervised) control() { + now := time.Now() + const op = errors.Op("supervised_pool_control_tick") + + // THIS IS A COPY OF WORKERS + workers := sp.pool.Workers() + + for i := 0; i < len(workers); i++ { + if workers[i].State().Value() == internal.StateInvalid { + continue + } + + s, err := tools.WorkerProcessState(workers[i]) + if err != nil { + // worker not longer valid for supervision + continue + } + + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + continue + } + + if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + continue + } + + // firs we check maxWorker idle + if sp.cfg.IdleTTL != 0 { + // then check for the worker state + if workers[i].State().Value() != internal.StateReady { + continue + } + + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + + // 1610530005534416045 lu + // lu - now = -7811150814 - nanoseconds + // 7.8 seconds + // get last used unix nano + lu := workers[i].State().LastUsed() + // worker not used, skip + if lu == 0 { + continue + } + + // convert last used to unixNano and sub time.now to seconds + // negative number, because lu always in the past, except for the `back to the future` :) + res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 + + // maxWorkerIdle more than diff between now and last used + // for example: + // After exec worker goes to the rest + // And resting for the 5 seconds + // IdleTTL is 1 second. + // After the control check, res will be 5, idle is 1 + // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. + if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + } + } + } +} |