summaryrefslogtreecommitdiff
path: root/pkg/pool/supervisor_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-17 12:13:55 +0300
committerGitHub <[email protected]>2020-12-17 12:13:55 +0300
commitee0cb478c74c393a35155c2bf51e1ef260e0e5e2 (patch)
tree2c99d4c6e2b2e9e3fa155d5d68a9d471c9aeeb9b /pkg/pool/supervisor_pool.go
parenta1dc59cabb6e63eab232922f4eb5a19dbd168f44 (diff)
parentedf924b37bcdad14eb31014c571ab58720aa178f (diff)
Merge pull request #452 from spiral/refactor/splitv2.0.0-alpha23
Refactor/split
Diffstat (limited to 'pkg/pool/supervisor_pool.go')
-rwxr-xr-xpkg/pool/supervisor_pool.go208
1 files changed, 208 insertions, 0 deletions
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
new file mode 100755
index 00000000..6d1f0c58
--- /dev/null
+++ b/pkg/pool/supervisor_pool.go
@@ -0,0 +1,208 @@
+package pool
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+)
+
+const MB = 1024 * 1024
+
+type Supervised interface {
+ pool.Pool
+ // Start used to start watching process for all pool workers
+ Start()
+}
+
+type supervised struct {
+ cfg *SupervisorConfig
+ events events.Handler
+ pool pool.Pool
+ stopCh chan struct{}
+ mu *sync.RWMutex
+}
+
+func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+ sp := &supervised{
+ cfg: cfg,
+ events: events,
+ pool: pool,
+ mu: &sync.RWMutex{},
+ stopCh: make(chan struct{}),
+ }
+ return sp
+}
+
+type ttlExec struct {
+ err error
+ p internal.Payload
+}
+
+func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
+ const op = errors.Op("exec_supervised")
+ if sp.cfg.ExecTTL == 0 {
+ return sp.pool.Exec(rqs)
+ }
+
+ c := make(chan ttlExec, 1)
+ ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL))
+ defer cancel()
+ go func() {
+ res, err := sp.pool.ExecWithContext(ctx, rqs)
+ if err != nil {
+ c <- ttlExec{
+ err: errors.E(op, err),
+ p: internal.Payload{},
+ }
+ }
+
+ c <- ttlExec{
+ err: nil,
+ p: res,
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return internal.Payload{}, res.err
+ }
+
+ return res.p, nil
+ }
+ }
+}
+
+func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) {
+ const op = errors.Op("supervised exec")
+ rsp, err := sp.pool.Exec(p)
+ if err != nil {
+ return internal.Payload{}, errors.E(op, err)
+ }
+ return rsp, nil
+}
+
+func (sp *supervised) AddListener(listener events.EventListener) {
+ sp.pool.AddListener(listener)
+}
+
+func (sp *supervised) GetConfig() interface{} {
+ return sp.pool.GetConfig()
+}
+
+func (sp *supervised) Workers() (workers []worker.BaseProcess) {
+ sp.mu.Lock()
+ defer sp.mu.Unlock()
+ return sp.pool.Workers()
+}
+
+func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
+ return sp.pool.RemoveWorker(worker)
+}
+
+func (sp *supervised) Destroy(ctx context.Context) {
+ sp.pool.Destroy(ctx)
+}
+
+func (sp *supervised) Start() {
+ go func() {
+ watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick))
+ for {
+ select {
+ case <-sp.stopCh:
+ watchTout.Stop()
+ return
+ // stop here
+ case <-watchTout.C:
+ sp.mu.Lock()
+ sp.control()
+ sp.mu.Unlock()
+ }
+ }
+ }()
+}
+
+func (sp *supervised) Stop() {
+ sp.stopCh <- struct{}{}
+}
+
+func (sp *supervised) control() {
+ now := time.Now()
+ const op = errors.Op("supervised pool control tick")
+
+ // THIS IS A COPY OF WORKERS
+ workers := sp.pool.Workers()
+
+ for i := 0; i < len(workers); i++ {
+ if workers[i].State().Value() == internal.StateInvalid {
+ continue
+ }
+
+ s, err := roadrunner.WorkerProcessState(workers[i])
+ if err != nil {
+ // worker not longer valid for supervision
+ continue
+ }
+
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
+ continue
+ }
+
+ if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
+ continue
+ }
+
+ // firs we check maxWorker idle
+ if sp.cfg.IdleTTL != 0 {
+ // then check for the worker state
+ if workers[i].State().Value() != internal.StateReady {
+ continue
+ }
+
+ /*
+ Calculate idle time
+ If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
+ 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
+ we are guessing that worker overlap idle time and has to be killed
+ */
+
+ // get last used unix nano
+ lu := workers[i].State().LastUsed()
+
+ // convert last used to unixNano and sub time.now
+ res := int64(lu) - now.UnixNano()
+
+ // maxWorkerIdle more than diff between now and last used
+ if sp.cfg.IdleTTL-uint64(res) <= 0 {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
+ }
+ }
+ }
+}