diff options
author | Valery Piashchynski <[email protected]> | 2020-10-13 13:55:20 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-13 13:55:20 +0300 |
commit | 0dc44d54cfcc9dd3fa09a41136f35a9a8d26b994 (patch) | |
tree | ffcb65010bebe9f5b5436192979e64b2402a6ec0 /pool_supervisor.go | |
parent | 08d6b6b7f773f83b286cd48c1a0fbec9a62fb42b (diff) |
Initial commit of RR 2.0v2.0.0-alpha1
Diffstat (limited to 'pool_supervisor.go')
-rw-r--r-- | pool_supervisor.go | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/pool_supervisor.go b/pool_supervisor.go new file mode 100644 index 00000000..cadf5f9c --- /dev/null +++ b/pool_supervisor.go @@ -0,0 +1,176 @@ +package roadrunner + +import ( + "context" + "errors" + "fmt" + "time" +) + +const MB = 1024 * 1024 + +type Supervisor interface { + Attach(pool Pool) + StartWatching() error + StopWatching() + Detach() +} + +type staticPoolSupervisor struct { + // maxWorkerMemory in MB + maxWorkerMemory uint64 + // maxPoolMemory in MB + maxPoolMemory uint64 + // maxWorkerTTL in seconds + maxWorkerTTL uint64 + // maxWorkerIdle in seconds + maxWorkerIdle uint64 + + // watchTimeout in seconds + watchTimeout uint64 + stopCh chan struct{} + + pool Pool +} + +/* +The arguments are: +maxWorkerMemory - maximum memory allowed for a single worker +maxPoolMemory - maximum pool memory allowed for a pool of a workers +maxTtl - maximum ttl for the worker after which it will be killed and replaced +maxIdle - maximum time to live for the worker in Ready state +watchTimeout - time between watching for the workers/pool status +*/ +// TODO might be just wrap the pool and return ControlledPool with included Pool interface +func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, watchTimeout uint64) Supervisor { + if maxWorkerMemory == 0 { + // just set to a big number, 5GB + maxPoolMemory = 5000 * MB + } + if watchTimeout == 0 { + watchTimeout = 60 + } + return &staticPoolSupervisor{ + maxWorkerMemory: maxWorkerMemory, + maxPoolMemory: maxPoolMemory, + maxWorkerTTL: maxTtl, + maxWorkerIdle: maxIdle, + stopCh: make(chan struct{}), + } +} + +func (sps *staticPoolSupervisor) Attach(pool Pool) { + sps.pool = pool +} + +func (sps *staticPoolSupervisor) StartWatching() error { + go func() { + watchTout := time.NewTicker(time.Second * time.Duration(sps.watchTimeout)) + for { + select { + case <-sps.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + err := sps.control() + if err != nil { + sps.pool.Events() <- PoolEvent{Payload: err} + } + } + } + }() + return nil +} + +func (sps *staticPoolSupervisor) StopWatching() { + sps.stopCh <- struct{}{} +} + +func (sps *staticPoolSupervisor) Detach() { + +} + +func (sps *staticPoolSupervisor) control() error { + if sps.pool == nil { + return errors.New("pool should be attached") + } + now := time.Now() + ctx := context.TODO() + + // THIS IS A COPY OF WORKERS + workers := sps.pool.Workers(ctx) + var totalUsedMemory uint64 + + for i := 0; i < len(workers); i++ { + if workers[i].State().Value() == StateInvalid { + continue + } + + s, err := WorkerProcessState(workers[i]) + if err != nil { + panic(err) + // push to pool events?? + } + + if sps.maxWorkerTTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sps.maxWorkerTTL) { + err = sps.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + return err + } + + // after remove worker we should exclude it from further analysis + workers = append(workers[:i], workers[i+1:]...) + } + + if sps.maxWorkerMemory != 0 && s.MemoryUsage >= sps.maxWorkerMemory*MB { + // TODO events + sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sps.maxWorkerMemory)} + err = sps.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + return err + } + workers = append(workers[:i], workers[i+1:]...) + continue + } + + // firs we check maxWorker idle + if sps.maxWorkerIdle != 0 { + // then check for the worker state + if workers[i].State().Value() != StateReady { + continue + } + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + // get last used unix nano + lu := workers[i].State().LastUsed() + // convert last used to unixNano and sub time.now + res := int64(lu) - now.UnixNano() + // maxWorkerIdle more than diff between now and last used + if int64(sps.maxWorkerIdle)-res <= 0 { + sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed worker idle time elapsed. actual idle time: %v, max idle time: %v", sps.maxWorkerIdle, res)} + err = sps.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + return err + } + workers = append(workers[:i], workers[i+1:]...) + } + } + + // the very last step is to calculate pool memory usage (except excluded workers) + totalUsedMemory += s.MemoryUsage + } + + // if current usage more than max allowed pool memory usage + if totalUsedMemory > sps.maxPoolMemory { + // destroy pool + totalUsedMemory = 0 + sps.pool.Destroy(ctx) + } + + return nil +} |