summaryrefslogtreecommitdiff
path: root/pkg/pool/supervisor_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/pool/supervisor_pool.go')
-rwxr-xr-xpkg/pool/supervisor_pool.go230
1 files changed, 0 insertions, 230 deletions
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
deleted file mode 100755
index e6b2bd7c..00000000
--- a/pkg/pool/supervisor_pool.go
+++ /dev/null
@@ -1,230 +0,0 @@
-package pool
-
-import (
- "context"
- "sync"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/state/process"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-const MB = 1024 * 1024
-
-// NSEC_IN_SEC nanoseconds in second
-const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
-
-type Supervised interface {
- Pool
- // Start used to start watching process for all pool workers
- Start()
-}
-
-type supervised struct {
- cfg *SupervisorConfig
- events events.Handler
- pool Pool
- stopCh chan struct{}
- mu *sync.RWMutex
-}
-
-func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
- sp := &supervised{
- cfg: cfg,
- events: events,
- pool: pool,
- mu: &sync.RWMutex{},
- stopCh: make(chan struct{}),
- }
-
- return sp
-}
-
-func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) {
- panic("used to satisfy pool interface")
-}
-
-func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("supervised_exec_with_context")
- if sp.cfg.ExecTTL == 0 {
- return sp.pool.Exec(rqs)
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
- defer cancel()
-
- res, err := sp.pool.execWithTTL(ctx, rqs)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return res, nil
-}
-
-func (sp *supervised) GetConfig() interface{} {
- return sp.pool.GetConfig()
-}
-
-func (sp *supervised) Workers() (workers []worker.BaseProcess) {
- sp.mu.Lock()
- defer sp.mu.Unlock()
- return sp.pool.Workers()
-}
-
-func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
- return sp.pool.RemoveWorker(worker)
-}
-
-func (sp *supervised) Destroy(ctx context.Context) {
- sp.pool.Destroy(ctx)
-}
-
-func (sp *supervised) Start() {
- go func() {
- watchTout := time.NewTicker(sp.cfg.WatchTick)
- for {
- select {
- case <-sp.stopCh:
- watchTout.Stop()
- return
- // stop here
- case <-watchTout.C:
- sp.mu.Lock()
- sp.control()
- sp.mu.Unlock()
- }
- }
- }()
-}
-
-func (sp *supervised) Stop() {
- sp.stopCh <- struct{}{}
-}
-
-func (sp *supervised) control() { //nolint:gocognit
- now := time.Now()
-
- // MIGHT BE OUTDATED
- // It's a copy of the Workers pointers
- workers := sp.pool.Workers()
-
- for i := 0; i < len(workers); i++ {
- // if worker not in the Ready OR working state
- // skip such worker
- switch workers[i].State().Value() {
- case
- worker.StateInvalid,
- worker.StateErrored,
- worker.StateDestroyed,
- worker.StateInactive,
- worker.StateStopped,
- worker.StateStopping,
- worker.StateKilling:
- continue
- }
-
- s, err := process.WorkerProcessState(workers[i])
- if err != nil {
- // worker not longer valid for supervision
- continue
- }
-
- if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
- /*
- worker at this point might be in the middle of request execution:
-
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
- ^
- TTL Reached, state - invalid |
- -----> Worker Stopped here
- */
-
- if workers[i].State().Value() != worker.StateWorking {
- workers[i].State().Set(worker.StateInvalid)
- _ = workers[i].Stop()
- }
- // just to double check
- workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
- continue
- }
-
- if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
- /*
- worker at this point might be in the middle of request execution:
-
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
- ^
- TTL Reached, state - invalid |
- -----> Worker Stopped here
- */
-
- if workers[i].State().Value() != worker.StateWorking {
- workers[i].State().Set(worker.StateInvalid)
- _ = workers[i].Stop()
- }
- // just to double check
- workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
- continue
- }
-
- // firs we check maxWorker idle
- if sp.cfg.IdleTTL != 0 {
- // then check for the worker state
- if workers[i].State().Value() != worker.StateReady {
- continue
- }
-
- /*
- Calculate idle time
- If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
- 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
- we are guessing that worker overlap idle time and has to be killed
- */
-
- // 1610530005534416045 lu
- // lu - now = -7811150814 - nanoseconds
- // 7.8 seconds
- // get last used unix nano
- lu := workers[i].State().LastUsed()
- // worker not used, skip
- if lu == 0 {
- continue
- }
-
- // convert last used to unixNano and sub time.now to seconds
- // negative number, because lu always in the past, except for the `back to the future` :)
- res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1
-
- // maxWorkerIdle more than diff between now and last used
- // for example:
- // After exec worker goes to the rest
- // And resting for the 5 seconds
- // IdleTTL is 1 second.
- // After the control check, res will be 5, idle is 1
- // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
- if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
- /*
- worker at this point might be in the middle of request execution:
-
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
- ^
- TTL Reached, state - invalid |
- -----> Worker Stopped here
- */
-
- if workers[i].State().Value() != worker.StateWorking {
- workers[i].State().Set(worker.StateInvalid)
- _ = workers[i].Stop()
- }
- // just to double-check
- workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
- }
- }
- }
-}