diff options
Diffstat (limited to 'supervisor_pool.go')
-rwxr-xr-x | supervisor_pool.go | 93 |
1 files changed, 83 insertions, 10 deletions
diff --git a/supervisor_pool.go b/supervisor_pool.go index 9d1d2b1e..0293ab8b 100755 --- a/supervisor_pool.go +++ b/supervisor_pool.go @@ -2,8 +2,10 @@ package roadrunner import ( "context" + "sync" "time" + "github.com/spiral/roadrunner/v2/errors" "github.com/spiral/roadrunner/v2/util" ) @@ -11,31 +13,100 @@ const MB = 1024 * 1024 type SupervisedPool interface { Pool - - // ExecWithContext provides the ability to execute with time deadline. Attention, worker will be destroyed if context - // deadline reached. - ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) + Start() } type supervisedPool struct { - cfg SupervisorConfig + cfg *SupervisorConfig events *util.EventHandler pool Pool stopCh chan struct{} + mu *sync.RWMutex } -func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool { - return &supervisedPool{ +func newPoolWatcher(pool Pool, events *util.EventHandler, cfg *SupervisorConfig) SupervisedPool { + sp := &supervisedPool{ cfg: cfg, events: events, pool: pool, + mu: &sync.RWMutex{}, stopCh: make(chan struct{}), } + return sp +} + +type ttlExec struct { + err error + p Payload +} + +func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { + const op = errors.Op("exec_supervised") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + c := make(chan ttlExec, 1) + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL)) + defer cancel() + go func() { + res, err := sp.pool.ExecWithContext(ctx, rqs) + if err != nil { + c <- ttlExec{ + err: err, + p: EmptyPayload, + } + } + + c <- ttlExec{ + err: nil, + p: res, + } + }() + + for { + select { + case <-ctx.Done(): + return EmptyPayload, errors.E(op, errors.TimeOut, ctx.Err()) + case res := <-c: + if res.err != nil { + return EmptyPayload, res.err + } + + return res.p, nil + } + } +} + +func (sp *supervisedPool) Exec(p Payload) (Payload, error) { + return sp.pool.Exec(p) +} + +func (sp *supervisedPool) AddListener(listener util.EventListener) { + sp.pool.AddListener(listener) +} + +func (sp *supervisedPool) GetConfig() Config { + return sp.pool.GetConfig() +} + +func (sp *supervisedPool) Workers() (workers []WorkerBase) { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Workers() +} + +func (sp *supervisedPool) RemoveWorker(ctx context.Context, worker WorkerBase) error { + return sp.pool.RemoveWorker(ctx, worker) +} + +func (sp *supervisedPool) Destroy(ctx context.Context) { + sp.pool.Destroy(ctx) } func (sp *supervisedPool) Start() { go func() { - watchTout := time.NewTicker(sp.cfg.WatchTick) + watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick)) for { select { case <-sp.stopCh: @@ -43,7 +114,9 @@ func (sp *supervisedPool) Start() { return // stop here case <-watchTout.C: + sp.mu.Lock() sp.control() + sp.mu.Unlock() } } }() @@ -89,7 +162,7 @@ func (sp *supervisedPool) control() { sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) return } else { - sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) + sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]}) } continue @@ -116,7 +189,7 @@ func (sp *supervisedPool) control() { res := int64(lu) - now.UnixNano() // maxWorkerIdle more than diff between now and last used - if sp.cfg.IdleTTL-res <= 0 { + if sp.cfg.IdleTTL-uint64(res) <= 0 { err = sp.pool.RemoveWorker(ctx, workers[i]) if err != nil { sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) |