summaryrefslogtreecommitdiff
path: root/pool_watcher.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2020-10-25 15:55:51 +0300
committerWolfy-J <[email protected]>2020-10-25 15:55:51 +0300
commitba5c562f9038ba434e655fb82c44597fcccaff16 (patch)
treeff112b9dcffda63bc40094a57d0df61622368445 /pool_watcher.go
parent3bdf7d02d83d1ff4726f3fbb01a45d016f39abec (diff)
- massive update in roadrunner 2.0 abstractions
Diffstat (limited to 'pool_watcher.go')
-rw-r--r--pool_watcher.go131
1 files changed, 131 insertions, 0 deletions
diff --git a/pool_watcher.go b/pool_watcher.go
new file mode 100644
index 00000000..6eb614dc
--- /dev/null
+++ b/pool_watcher.go
@@ -0,0 +1,131 @@
+package roadrunner
+
+import (
+ "context"
+ "github.com/spiral/roadrunner/v2/util"
+ "time"
+)
+
+const MB = 1024 * 1024
+
+type supervisedPool struct {
+ cfg SupervisorConfig
+ events *util.EventHandler
+ pool Pool
+ stopCh chan struct{}
+}
+
+func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool {
+ return &supervisedPool{
+ cfg: cfg,
+ events: events,
+ pool: pool,
+ stopCh: make(chan struct{}),
+ }
+}
+
+func (sp *supervisedPool) Start() {
+ go func() {
+ watchTout := time.NewTicker(sp.cfg.WatchTick)
+ for {
+ select {
+ case <-sp.stopCh:
+ watchTout.Stop()
+ return
+ // stop here
+ case <-watchTout.C:
+ sp.control()
+ }
+ }
+ }()
+}
+
+func (sp *supervisedPool) Stop() {
+ sp.stopCh <- struct{}{}
+}
+
+func (sp *supervisedPool) control() {
+ now := time.Now()
+ ctx := context.TODO()
+
+ // THIS IS A COPY OF WORKERS
+ workers := sp.pool.Workers()
+
+ for i := 0; i < len(workers); i++ {
+ if workers[i].State().Value() == StateInvalid {
+ continue
+ }
+
+ s, err := WorkerProcessState(workers[i])
+ if err != nil {
+ // worker not longer valid for supervision
+ continue
+ }
+
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
+ err = sp.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ return
+ } else {
+ sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
+ }
+
+ continue
+ }
+
+ if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
+ // TODO events
+ //sp.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sp.maxWorkerMemory)}
+ err = sp.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ return
+ } else {
+ sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
+ }
+
+ continue
+ }
+
+ // firs we check maxWorker idle
+ if sp.cfg.IdleTTL != 0 {
+ // then check for the worker state
+ if workers[i].State().Value() != StateReady {
+ continue
+ }
+
+ /*
+ Calculate idle time
+ If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
+ 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
+ we are guessing that worker overlap idle time and has to be killed
+ */
+
+ // get last used unix nano
+ lu := workers[i].State().LastUsed()
+
+ // convert last used to unixNano and sub time.now
+ res := int64(lu) - now.UnixNano()
+
+ // maxWorkerIdle more than diff between now and last used
+ if sp.cfg.IdleTTL-res <= 0 {
+ err = sp.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ return
+ } else {
+ sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]})
+ }
+ }
+ }
+
+ // the very last step is to calculate pool memory usage (except excluded workers)
+ //totalUsedMemory += s.MemoryUsage
+ }
+
+ //// if current usage more than max allowed pool memory usage
+ //if totalUsedMemory > sp.maxPoolMemory {
+ // sp.pool.Destroy(ctx)
+ //}
+}