summaryrefslogtreecommitdiff
path: root/pool_supervisor.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-13 13:55:20 +0300
committerValery Piashchynski <[email protected]>2020-10-13 13:55:20 +0300
commit0dc44d54cfcc9dd3fa09a41136f35a9a8d26b994 (patch)
treeffcb65010bebe9f5b5436192979e64b2402a6ec0 /pool_supervisor.go
parent08d6b6b7f773f83b286cd48c1a0fbec9a62fb42b (diff)
Initial commit of RR 2.0v2.0.0-alpha1
Diffstat (limited to 'pool_supervisor.go')
-rw-r--r--pool_supervisor.go176
1 files changed, 176 insertions, 0 deletions
diff --git a/pool_supervisor.go b/pool_supervisor.go
new file mode 100644
index 00000000..cadf5f9c
--- /dev/null
+++ b/pool_supervisor.go
@@ -0,0 +1,176 @@
+package roadrunner
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+)
+
+const MB = 1024 * 1024
+
+type Supervisor interface {
+ Attach(pool Pool)
+ StartWatching() error
+ StopWatching()
+ Detach()
+}
+
+type staticPoolSupervisor struct {
+ // maxWorkerMemory in MB
+ maxWorkerMemory uint64
+ // maxPoolMemory in MB
+ maxPoolMemory uint64
+ // maxWorkerTTL in seconds
+ maxWorkerTTL uint64
+ // maxWorkerIdle in seconds
+ maxWorkerIdle uint64
+
+ // watchTimeout in seconds
+ watchTimeout uint64
+ stopCh chan struct{}
+
+ pool Pool
+}
+
+/*
+The arguments are:
+maxWorkerMemory - maximum memory allowed for a single worker
+maxPoolMemory - maximum pool memory allowed for a pool of a workers
+maxTtl - maximum ttl for the worker after which it will be killed and replaced
+maxIdle - maximum time to live for the worker in Ready state
+watchTimeout - time between watching for the workers/pool status
+*/
+// TODO might be just wrap the pool and return ControlledPool with included Pool interface
+func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, watchTimeout uint64) Supervisor {
+ if maxWorkerMemory == 0 {
+ // just set to a big number, 5GB
+ maxPoolMemory = 5000 * MB
+ }
+ if watchTimeout == 0 {
+ watchTimeout = 60
+ }
+ return &staticPoolSupervisor{
+ maxWorkerMemory: maxWorkerMemory,
+ maxPoolMemory: maxPoolMemory,
+ maxWorkerTTL: maxTtl,
+ maxWorkerIdle: maxIdle,
+ stopCh: make(chan struct{}),
+ }
+}
+
+func (sps *staticPoolSupervisor) Attach(pool Pool) {
+ sps.pool = pool
+}
+
+func (sps *staticPoolSupervisor) StartWatching() error {
+ go func() {
+ watchTout := time.NewTicker(time.Second * time.Duration(sps.watchTimeout))
+ for {
+ select {
+ case <-sps.stopCh:
+ watchTout.Stop()
+ return
+ // stop here
+ case <-watchTout.C:
+ err := sps.control()
+ if err != nil {
+ sps.pool.Events() <- PoolEvent{Payload: err}
+ }
+ }
+ }
+ }()
+ return nil
+}
+
+func (sps *staticPoolSupervisor) StopWatching() {
+ sps.stopCh <- struct{}{}
+}
+
+func (sps *staticPoolSupervisor) Detach() {
+
+}
+
+func (sps *staticPoolSupervisor) control() error {
+ if sps.pool == nil {
+ return errors.New("pool should be attached")
+ }
+ now := time.Now()
+ ctx := context.TODO()
+
+ // THIS IS A COPY OF WORKERS
+ workers := sps.pool.Workers(ctx)
+ var totalUsedMemory uint64
+
+ for i := 0; i < len(workers); i++ {
+ if workers[i].State().Value() == StateInvalid {
+ continue
+ }
+
+ s, err := WorkerProcessState(workers[i])
+ if err != nil {
+ panic(err)
+ // push to pool events??
+ }
+
+ if sps.maxWorkerTTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sps.maxWorkerTTL) {
+ err = sps.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ return err
+ }
+
+ // after remove worker we should exclude it from further analysis
+ workers = append(workers[:i], workers[i+1:]...)
+ }
+
+ if sps.maxWorkerMemory != 0 && s.MemoryUsage >= sps.maxWorkerMemory*MB {
+ // TODO events
+ sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sps.maxWorkerMemory)}
+ err = sps.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ return err
+ }
+ workers = append(workers[:i], workers[i+1:]...)
+ continue
+ }
+
+ // firs we check maxWorker idle
+ if sps.maxWorkerIdle != 0 {
+ // then check for the worker state
+ if workers[i].State().Value() != StateReady {
+ continue
+ }
+ /*
+ Calculate idle time
+ If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
+ 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
+ we are guessing that worker overlap idle time and has to be killed
+ */
+ // get last used unix nano
+ lu := workers[i].State().LastUsed()
+ // convert last used to unixNano and sub time.now
+ res := int64(lu) - now.UnixNano()
+ // maxWorkerIdle more than diff between now and last used
+ if int64(sps.maxWorkerIdle)-res <= 0 {
+ sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed worker idle time elapsed. actual idle time: %v, max idle time: %v", sps.maxWorkerIdle, res)}
+ err = sps.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ return err
+ }
+ workers = append(workers[:i], workers[i+1:]...)
+ }
+ }
+
+ // the very last step is to calculate pool memory usage (except excluded workers)
+ totalUsedMemory += s.MemoryUsage
+ }
+
+ // if current usage more than max allowed pool memory usage
+ if totalUsedMemory > sps.maxPoolMemory {
+ // destroy pool
+ totalUsedMemory = 0
+ sps.pool.Destroy(ctx)
+ }
+
+ return nil
+}