diff options
Diffstat (limited to 'pool/supervisor_pool.go')
-rwxr-xr-x | pool/supervisor_pool.go | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go new file mode 100755 index 00000000..99af168c --- /dev/null +++ b/pool/supervisor_pool.go @@ -0,0 +1,230 @@ +package pool + +import ( + "context" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/state/process" + "github.com/spiral/roadrunner/v2/worker" +) + +const MB = 1024 * 1024 + +// NSEC_IN_SEC nanoseconds in second +const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck + +type Supervised interface { + Pool + // Start used to start watching process for all pool workers + Start() +} + +type supervised struct { + cfg *SupervisorConfig + events events.Handler + pool Pool + stopCh chan struct{} + mu *sync.RWMutex +} + +func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { + sp := &supervised{ + cfg: cfg, + events: events, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), + } + + return sp +} + +func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) { + panic("used to satisfy pool interface") +} + +func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("supervised_exec_with_context") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) + defer cancel() + + res, err := sp.pool.execWithTTL(ctx, rqs) + if err != nil { + return nil, errors.E(op, err) + } + + return res, nil +} + +func (sp *supervised) GetConfig() interface{} { + return sp.pool.GetConfig() +} + +func (sp *supervised) Workers() (workers []worker.BaseProcess) { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Workers() +} + +func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { + return sp.pool.RemoveWorker(worker) +} + +func (sp *supervised) Destroy(ctx context.Context) { + sp.pool.Destroy(ctx) +} + +func (sp *supervised) Start() { + go func() { + watchTout := time.NewTicker(sp.cfg.WatchTick) + for { + select { + case <-sp.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + sp.mu.Lock() + sp.control() + sp.mu.Unlock() + } + } + }() +} + +func (sp *supervised) Stop() { + sp.stopCh <- struct{}{} +} + +func (sp *supervised) control() { //nolint:gocognit + now := time.Now() + + // MIGHT BE OUTDATED + // It's a copy of the Workers pointers + workers := sp.pool.Workers() + + for i := 0; i < len(workers); i++ { + // if worker not in the Ready OR working state + // skip such worker + switch workers[i].State().Value() { + case + worker.StateInvalid, + worker.StateErrored, + worker.StateDestroyed, + worker.StateInactive, + worker.StateStopped, + worker.StateStopping, + worker.StateKilling: + continue + } + + s, err := process.WorkerProcessState(workers[i]) + if err != nil { + // worker not longer valid for supervision + continue + } + + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + // just to double check + workers[i].State().Set(worker.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + continue + } + + if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + // just to double check + workers[i].State().Set(worker.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + continue + } + + // firs we check maxWorker idle + if sp.cfg.IdleTTL != 0 { + // then check for the worker state + if workers[i].State().Value() != worker.StateReady { + continue + } + + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + + // 1610530005534416045 lu + // lu - now = -7811150814 - nanoseconds + // 7.8 seconds + // get last used unix nano + lu := workers[i].State().LastUsed() + // worker not used, skip + if lu == 0 { + continue + } + + // convert last used to unixNano and sub time.now to seconds + // negative number, because lu always in the past, except for the `back to the future` :) + res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 + + // maxWorkerIdle more than diff between now and last used + // for example: + // After exec worker goes to the rest + // And resting for the 5 seconds + // IdleTTL is 1 second. + // After the control check, res will be 5, idle is 1 + // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. + if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + // just to double-check + workers[i].State().Set(worker.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + } + } + } +} |