diff options
-rwxr-xr-x | pkg/pool/static_pool.go | 2 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 74 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 6 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 9 | ||||
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 4 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 11 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 123 |
7 files changed, 85 insertions, 144 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index ab025fa1..e568661f 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -174,7 +174,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo return sp.execDebugWithTTL(ctx, p) } - ctxAlloc, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() w, err := sp.getWorker(ctxAlloc, op) if err != nil { diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index ca61dbc4..a1dd21ac 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -43,47 +43,8 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) return sp } -type ttlExec struct { - err error - p payload.Payload -} - -func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { - const op = errors.Op("supervised_exec_with_context") - if sp.cfg.ExecTTL == 0 { - return sp.pool.Exec(rqs) - } - - c := make(chan ttlExec, 1) - ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) - defer cancel() - go func() { - res, err := sp.pool.execWithTTL(ctx, rqs) - if err != nil { - c <- ttlExec{ - err: errors.E(op, err), - p: payload.Payload{}, - } - } - - c <- ttlExec{ - err: nil, - p: res, - } - }() - - for { - select { - case <-ctx.Done(): - return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) - case res := <-c: - if res.err != nil { - return payload.Payload{}, res.err - } - - return res.p, nil - } - } +func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) { + panic("used to satisfy pool interface") } func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { @@ -92,36 +53,15 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { return sp.pool.Exec(rqs) } - c := make(chan ttlExec, 1) ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) defer cancel() - go func() { - res, err := sp.pool.execWithTTL(ctx, rqs) - if err != nil { - c <- ttlExec{ - err: errors.E(op, err), - p: payload.Payload{}, - } - } - c <- ttlExec{ - err: nil, - p: res, - } - }() - - for { - select { - case <-ctx.Done(): - return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) - case res := <-c: - if res.err != nil { - return payload.Payload{}, res.err - } - - return res.p, nil - } + res, err := sp.pool.execWithTTL(ctx, rqs) + if err != nil { + return payload.Payload{}, errors.E(op, err) } + + return res, nil } func (sp *supervised) GetConfig() interface{} { diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index dc307c33..f371b925 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -108,7 +108,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.execWithTTL(context.Background(), payload.Payload{ + resp, err := p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -148,7 +148,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.execWithTTL(context.Background(), payload.Payload{ + resp, err := p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -160,7 +160,7 @@ func TestSupervisedPool_Idle(t *testing.T) { time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.execWithTTL(context.Background(), payload.Payload{ + _, err = p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 13e70f49..84ff5977 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -111,6 +111,15 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p return } + if tw.process.State().Value() != StateWorking { + tw.process.State().RegisterExec() + c <- wexec{ + payload: rsp, + err: nil, + } + return + } + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go index 532bace9..0e648d34 100644 --- a/pkg/worker_watcher/container/interface.go +++ b/pkg/worker_watcher/container/interface.go @@ -1,6 +1,8 @@ package container -import "github.com/spiral/roadrunner/v2/pkg/worker" +import ( + "github.com/spiral/roadrunner/v2/pkg/worker" +) // Vector interface represents vector container type Vector interface { diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go index 565b1b69..1ab9d073 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/vec.go @@ -7,12 +7,14 @@ import ( ) type Vec struct { + wqLen uint64 destroy uint64 workers chan worker.BaseProcess } func NewVector(initialNumOfWorkers uint64) Vector { vec := &Vec{ + wqLen: 0, destroy: 0, workers: make(chan worker.BaseProcess, initialNumOfWorkers), } @@ -21,6 +23,7 @@ func NewVector(initialNumOfWorkers uint64) Vector { } func (v *Vec) Enqueue(w worker.BaseProcess) { + atomic.AddUint64(&v.wqLen, 1) v.workers <- w } @@ -31,11 +34,17 @@ func (v *Vec) Dequeue() (worker.BaseProcess, bool) { return true } */ + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { return nil, true } - return <-v.workers, false + if num := atomic.LoadUint64(&v.wqLen); num > 0 { + atomic.AddUint64(&v.wqLen, ^uint64(0)) + return <-v.workers, false + } + + return nil, false } func (v *Vec) Destroy() { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 108756fc..9d66a75c 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -47,88 +47,69 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { return nil } -// return value from Get -type get struct { - w worker.BaseProcess - err error -} - // Get is not a thread safe operation func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { - c := make(chan get, 1) const op = errors.Op("worker_watcher_get_free_worker") - go func() { - // FAST PATH - // thread safe operation - w, stop := ww.container.Dequeue() - if stop { - c <- get{ - nil, - errors.E(op, errors.WatcherStopped), - } - return - } - // fast path, worker not nil and in the ReadyState - if w.State().Value() == worker.StateReady { - c <- get{ - w, - nil, - } - return - } - // ========================================================= - // SLOW PATH - _ = w.Kill() // how the worker get here??????? - // no free workers in the container - // try to continuously get free one - for { - w, stop = ww.container.Dequeue() + for { + select { + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers) + default: + // thread safe operation + w, stop := ww.container.Dequeue() if stop { - c <- get{ - nil, - errors.E(op, errors.WatcherStopped), - } + return nil, errors.E(op, errors.WatcherStopped) } - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - c <- get{ - w, - nil, - } - return - case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateStopping: - // worker doing no work because it in the container - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker + if w == nil { continue } - } - }() - select { - case r := <-c: - if r.err != nil { - return nil, r.err + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + return w, nil + } + // ========================================================= + // SLOW PATH + _ = w.Kill() // how the worker get here??????? + // no free workers in the container + // try to continuously get free one + for { + w, stop = ww.container.Dequeue() + if stop { + return nil, errors.E(op, errors.WatcherStopped) + } + + if w == nil { + continue + } + + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + return w, nil + case worker.StateWorking: // how?? + ww.container.Enqueue(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue + } + } } - return r.w, nil - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed")) } } |