diff options
Diffstat (limited to 'pool_supervisor.go')
-rw-r--r-- | pool_supervisor.go | 182 |
1 files changed, 0 insertions, 182 deletions
diff --git a/pool_supervisor.go b/pool_supervisor.go deleted file mode 100644 index c0a6ecd9..00000000 --- a/pool_supervisor.go +++ /dev/null @@ -1,182 +0,0 @@ -package roadrunner - -import ( - "context" - "errors" - "fmt" - "time" -) - -const MB = 1024 * 1024 - -type Supervisor interface { - Attach(pool Pool) - StartWatching() error - StopWatching() - Detach() -} - -type staticPoolSupervisor struct { - // maxWorkerMemory in MB - maxWorkerMemory uint64 - // maxPoolMemory in MB - maxPoolMemory uint64 - // maxWorkerTTL in seconds - maxWorkerTTL uint64 - // maxWorkerIdle in seconds - maxWorkerIdle uint64 - - // watchTimeout in seconds - watchTimeout uint64 - stopCh chan struct{} - - pool Pool -} - -/* -The arguments are: -maxWorkerMemory - maximum memory allowed for a single worker -maxPoolMemory - maximum pool memory allowed for a pool of a workers -maxTtl - maximum ttl for the worker after which it will be killed and replaced -maxIdle - maximum time to live for the worker in Ready state -watchTimeout - time between watching for the workers/pool status -*/ -// TODO might be just wrap the pool and return ControlledPool with included Pool interface -func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, watchTimeout uint64) Supervisor { - if maxWorkerMemory == 0 { - // just set to a big number, 5GB - maxPoolMemory = 5000 * MB - } - - if watchTimeout == 0 { - watchTimeout = 60 - } - - return &staticPoolSupervisor{ - maxWorkerMemory: maxWorkerMemory, - maxPoolMemory: maxPoolMemory, - maxWorkerTTL: maxTtl, - maxWorkerIdle: maxIdle, - watchTimeout: watchTimeout, - stopCh: make(chan struct{}), - } -} - -func (sps *staticPoolSupervisor) Attach(pool Pool) { - sps.pool = pool -} - -func (sps *staticPoolSupervisor) StartWatching() error { - go func() { - watchTout := time.NewTicker(time.Second * time.Duration(sps.watchTimeout)) - for { - select { - case <-sps.stopCh: - watchTout.Stop() - return - // stop here - case <-watchTout.C: - err := sps.control() - if err != nil { - sps.pool.Events() <- PoolEvent{Payload: err} - } - } - } - }() - return nil -} - -func (sps *staticPoolSupervisor) StopWatching() { - sps.stopCh <- struct{}{} -} - -func (sps *staticPoolSupervisor) Detach() { - -} - -func (sps *staticPoolSupervisor) control() error { - if sps.pool == nil { - return errors.New("pool should be attached") - } - now := time.Now() - ctx := context.TODO() - - // THIS IS A COPY OF WORKERS - workers := sps.pool.Workers() - totalUsedMemory := uint64(0) - - for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == StateInvalid { - continue - } - - s, err := WorkerProcessState(workers[i]) - if err != nil { - err2 := sps.pool.RemoveWorker(ctx, workers[i]) - if err2 != nil { - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2)} - return fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2) - } - sps.pool.Events() <- PoolEvent{Payload: err} - return err - } - - if sps.maxWorkerTTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sps.maxWorkerTTL) { - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - - // after remove worker we should exclude it from further analysis - workers = append(workers[:i], workers[i+1:]...) - } - - if sps.maxWorkerMemory != 0 && s.MemoryUsage >= sps.maxWorkerMemory*MB { - // TODO events - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sps.maxWorkerMemory)} - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - workers = append(workers[:i], workers[i+1:]...) - continue - } - - // firs we check maxWorker idle - if sps.maxWorkerIdle != 0 { - // then check for the worker state - if workers[i].State().Value() != StateReady { - continue - } - /* - Calculate idle time - If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 - 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle - we are guessing that worker overlap idle time and has to be killed - */ - // get last used unix nano - lu := workers[i].State().LastUsed() - // convert last used to unixNano and sub time.now - res := int64(lu) - now.UnixNano() - // maxWorkerIdle more than diff between now and last used - if int64(sps.maxWorkerIdle)-res <= 0 { - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed worker idle time elapsed. actual idle time: %v, max idle time: %v", sps.maxWorkerIdle, res)} - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - workers = append(workers[:i], workers[i+1:]...) - } - } - - // the very last step is to calculate pool memory usage (except excluded workers) - totalUsedMemory += s.MemoryUsage - } - - // if current usage more than max allowed pool memory usage - if totalUsedMemory > sps.maxPoolMemory { - sps.pool.Destroy(ctx) - } - - return nil -} |