diff options
author | Valery Piashchynski <[email protected]> | 2020-12-17 12:13:55 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-12-17 12:13:55 +0300 |
commit | ee0cb478c74c393a35155c2bf51e1ef260e0e5e2 (patch) | |
tree | 2c99d4c6e2b2e9e3fa155d5d68a9d471c9aeeb9b /pkg/pool/supervisor_pool.go | |
parent | a1dc59cabb6e63eab232922f4eb5a19dbd168f44 (diff) | |
parent | edf924b37bcdad14eb31014c571ab58720aa178f (diff) |
Merge pull request #452 from spiral/refactor/splitv2.0.0-alpha23
Refactor/split
Diffstat (limited to 'pkg/pool/supervisor_pool.go')
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go new file mode 100755 index 00000000..6d1f0c58 --- /dev/null +++ b/pkg/pool/supervisor_pool.go @@ -0,0 +1,208 @@ +package pool + +import ( + "context" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" +) + +const MB = 1024 * 1024 + +type Supervised interface { + pool.Pool + // Start used to start watching process for all pool workers + Start() +} + +type supervised struct { + cfg *SupervisorConfig + events events.Handler + pool pool.Pool + stopCh chan struct{} + mu *sync.RWMutex +} + +func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised { + sp := &supervised{ + cfg: cfg, + events: events, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), + } + return sp +} + +type ttlExec struct { + err error + p internal.Payload +} + +func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) { + const op = errors.Op("exec_supervised") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + c := make(chan ttlExec, 1) + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL)) + defer cancel() + go func() { + res, err := sp.pool.ExecWithContext(ctx, rqs) + if err != nil { + c <- ttlExec{ + err: errors.E(op, err), + p: internal.Payload{}, + } + } + + c <- ttlExec{ + err: nil, + p: res, + } + }() + + for { + select { + case <-ctx.Done(): + return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) + case res := <-c: + if res.err != nil { + return internal.Payload{}, res.err + } + + return res.p, nil + } + } +} + +func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) { + const op = errors.Op("supervised exec") + rsp, err := sp.pool.Exec(p) + if err != nil { + return internal.Payload{}, errors.E(op, err) + } + return rsp, nil +} + +func (sp *supervised) AddListener(listener events.EventListener) { + sp.pool.AddListener(listener) +} + +func (sp *supervised) GetConfig() interface{} { + return sp.pool.GetConfig() +} + +func (sp *supervised) Workers() (workers []worker.BaseProcess) { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Workers() +} + +func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { + return sp.pool.RemoveWorker(worker) +} + +func (sp *supervised) Destroy(ctx context.Context) { + sp.pool.Destroy(ctx) +} + +func (sp *supervised) Start() { + go func() { + watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick)) + for { + select { + case <-sp.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + sp.mu.Lock() + sp.control() + sp.mu.Unlock() + } + } + }() +} + +func (sp *supervised) Stop() { + sp.stopCh <- struct{}{} +} + +func (sp *supervised) control() { + now := time.Now() + const op = errors.Op("supervised pool control tick") + + // THIS IS A COPY OF WORKERS + workers := sp.pool.Workers() + + for i := 0; i < len(workers); i++ { + if workers[i].State().Value() == internal.StateInvalid { + continue + } + + s, err := roadrunner.WorkerProcessState(workers[i]) + if err != nil { + // worker not longer valid for supervision + continue + } + + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + continue + } + + if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + continue + } + + // firs we check maxWorker idle + if sp.cfg.IdleTTL != 0 { + // then check for the worker state + if workers[i].State().Value() != internal.StateReady { + continue + } + + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + + // get last used unix nano + lu := workers[i].State().LastUsed() + + // convert last used to unixNano and sub time.now + res := int64(lu) - now.UnixNano() + + // maxWorkerIdle more than diff between now and last used + if sp.cfg.IdleTTL-uint64(res) <= 0 { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + } + } + } +} |