summaryrefslogtreecommitdiff
path: root/supervisor_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'supervisor_pool.go')
-rwxr-xr-xsupervisor_pool.go93
1 files changed, 83 insertions, 10 deletions
diff --git a/supervisor_pool.go b/supervisor_pool.go
index 9d1d2b1e..0293ab8b 100755
--- a/supervisor_pool.go
+++ b/supervisor_pool.go
@@ -2,8 +2,10 @@ package roadrunner
import (
"context"
+ "sync"
"time"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
)
@@ -11,31 +13,100 @@ const MB = 1024 * 1024
type SupervisedPool interface {
Pool
-
- // ExecWithContext provides the ability to execute with time deadline. Attention, worker will be destroyed if context
- // deadline reached.
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+ Start()
}
type supervisedPool struct {
- cfg SupervisorConfig
+ cfg *SupervisorConfig
events *util.EventHandler
pool Pool
stopCh chan struct{}
+ mu *sync.RWMutex
}
-func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool {
- return &supervisedPool{
+func newPoolWatcher(pool Pool, events *util.EventHandler, cfg *SupervisorConfig) SupervisedPool {
+ sp := &supervisedPool{
cfg: cfg,
events: events,
pool: pool,
+ mu: &sync.RWMutex{},
stopCh: make(chan struct{}),
}
+ return sp
+}
+
+type ttlExec struct {
+ err error
+ p Payload
+}
+
+func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+ const op = errors.Op("exec_supervised")
+ if sp.cfg.ExecTTL == 0 {
+ return sp.pool.Exec(rqs)
+ }
+
+ c := make(chan ttlExec, 1)
+ ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL))
+ defer cancel()
+ go func() {
+ res, err := sp.pool.ExecWithContext(ctx, rqs)
+ if err != nil {
+ c <- ttlExec{
+ err: err,
+ p: EmptyPayload,
+ }
+ }
+
+ c <- ttlExec{
+ err: nil,
+ p: res,
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return EmptyPayload, errors.E(op, errors.TimeOut, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return EmptyPayload, res.err
+ }
+
+ return res.p, nil
+ }
+ }
+}
+
+func (sp *supervisedPool) Exec(p Payload) (Payload, error) {
+ return sp.pool.Exec(p)
+}
+
+func (sp *supervisedPool) AddListener(listener util.EventListener) {
+ sp.pool.AddListener(listener)
+}
+
+func (sp *supervisedPool) GetConfig() Config {
+ return sp.pool.GetConfig()
+}
+
+func (sp *supervisedPool) Workers() (workers []WorkerBase) {
+ sp.mu.Lock()
+ defer sp.mu.Unlock()
+ return sp.pool.Workers()
+}
+
+func (sp *supervisedPool) RemoveWorker(ctx context.Context, worker WorkerBase) error {
+ return sp.pool.RemoveWorker(ctx, worker)
+}
+
+func (sp *supervisedPool) Destroy(ctx context.Context) {
+ sp.pool.Destroy(ctx)
}
func (sp *supervisedPool) Start() {
go func() {
- watchTout := time.NewTicker(sp.cfg.WatchTick)
+ watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick))
for {
select {
case <-sp.stopCh:
@@ -43,7 +114,9 @@ func (sp *supervisedPool) Start() {
return
// stop here
case <-watchTout.C:
+ sp.mu.Lock()
sp.control()
+ sp.mu.Unlock()
}
}
}()
@@ -89,7 +162,7 @@ func (sp *supervisedPool) control() {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
return
} else {
- sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
+ sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]})
}
continue
@@ -116,7 +189,7 @@ func (sp *supervisedPool) control() {
res := int64(lu) - now.UnixNano()
// maxWorkerIdle more than diff between now and last used
- if sp.cfg.IdleTTL-res <= 0 {
+ if sp.cfg.IdleTTL-uint64(res) <= 0 {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})