diff options
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 240 |
1 files changed, 153 insertions, 87 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 2380c190..3e0633a3 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,33 +3,42 @@ package worker_watcher //nolint:golint,stylecheck import ( "context" "sync" + "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) -// workerCreateFunc can be nil, but in that case, dead stack will not be replaced +// workerCreateFunc can be nil, but in that case, dead container will not be replaced func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ - stack: NewWorkersStack(numWorkers), - allocator: allocator, - events: events, + container: container.NewVector(numWorkers), + numWorkers: numWorkers, + workers: make([]worker.BaseProcess, 0, numWorkers), + allocator: allocator, + events: events, } return ww } type workerWatcher struct { - mutex sync.RWMutex - stack *Stack - allocator worker.Allocator - events events.Handler + sync.RWMutex + container container.Vector + // used to control the Destroy stage (that all workers are in the container) + numWorkers uint64 + workers []worker.BaseProcess + allocator worker.Allocator + events events.Handler } func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - ww.stack.Push(workers[i]) + ww.container.Enqueue(workers[i]) + // add worker to watch slice + ww.workers = append(ww.workers, workers[i]) go func(swc worker.BaseProcess) { ww.wait(swc) @@ -38,75 +47,96 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { return nil } +// return value from Get +type get struct { + w worker.BaseProcess + err error +} + // Get is not a thread safe operation func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { + c := make(chan get, 1) const op = errors.Op("worker_watcher_get_free_worker") - // FAST PATH - // thread safe operation - w, stop := ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } - - // fast path, worker not nil and in the ReadyState - if w != nil && w.State().Value() == worker.StateReady { - return w, nil - } - // ========================================================= - // SLOW PATH - // Put worker back (no matter it's state, it will be killed next) - if w != nil { - ww.stack.Push(w) - } - // no free workers in the stack - // try to continuously get free one - for { - select { - default: - w, stop = ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) + go func() { + // FAST PATH + // thread safe operation + w, stop := ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), } - if w == nil { - continue + return + } + + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + c <- get{ + w, + nil, } + return + } + // ========================================================= + // SLOW PATH + _ = w.Kill() + // no free workers in the container + // try to continuously get free one + for { + select { + default: + w, stop = ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), + } + } - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - return w, nil - case worker.StateRemove: - err := ww.Remove(w) - if err != nil { - return nil, errors.E(op, err) + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + c <- get{ + w, + nil, + } + return + case worker.StateWorking: // how?? + ww.container.Enqueue(w) + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue } - // try to get next - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateWorking, // ??? how - worker.StateStopping: - // worker doing no work because it in the stack - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue } - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } + }() + + select { + case r := <-c: + if r.err != nil { + return nil, r.err + } + return r.w, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed")) } } func (ww *workerWatcher) Allocate() error { - ww.mutex.Lock() + ww.Lock() const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() if err != nil { @@ -114,47 +144,83 @@ func (ww *workerWatcher) Allocate() error { } ww.addToWatch(sw) - ww.mutex.Unlock() - ww.Push(sw) + ww.workers = append(ww.workers, sw) + + ww.Unlock() + ww.Push(sw) return nil } // Remove -func (ww *workerWatcher) Remove(wb worker.BaseProcess) error { - ww.mutex.Lock() - defer ww.mutex.Unlock() +func (ww *workerWatcher) Remove(wb worker.BaseProcess) { + ww.Lock() + defer ww.Unlock() - const op = errors.Op("worker_watcher_remove_worker") // set remove state - wb.State().Set(worker.StateRemove) - if ww.stack.FindAndRemoveByPid(wb.Pid()) { - err := wb.Kill() - if err != nil { - return errors.E(op, err) + pid := wb.Pid() + + // worker will be removed on the Get operation + for i := 0; i < len(ww.workers); i++ { + if ww.workers[i].Pid() == pid { + ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) + // kill worker + _ = wb.Kill() + return } - return nil } - - return nil } // O(1) operation func (ww *workerWatcher) Push(w worker.BaseProcess) { - ww.mutex.Lock() - defer ww.mutex.Unlock() - ww.stack.Push(w) + ww.container.Enqueue(w) } -// Destroy all underlying stack (but let them to complete the task) +// Destroy all underlying container (but let them to complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { - // destroy stack, we don't use ww mutex here, since we should be able to push worker - ww.stack.Destroy(ctx) + // destroy container, we don't use ww mutex here, since we should be able to push worker + ww.Lock() + // do not release new workers + ww.container.Destroy() + ww.Unlock() + + tt := time.NewTicker(time.Millisecond * 500) + defer tt.Stop() + for { + select { + case <-tt.C: + ww.Lock() + // that might be one of the workers is working + if ww.numWorkers != uint64(len(ww.workers)) { + ww.Unlock() + continue + } + ww.Unlock() + // unnecessary mutex, but + // just to make sure. All container at this moment are in the container + // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.Lock() + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + return + } + } } // Warning, this is O(n) operation, and it will return copy of the actual workers func (ww *workerWatcher) List() []worker.BaseProcess { - return ww.stack.Workers() + ww.Lock() + defer ww.Unlock() + + base := make([]worker.BaseProcess, 0, len(ww.workers)) + for i := 0; i < len(ww.workers); i++ { + base = append(base, ww.workers[i]) + } + + return base } func (ww *workerWatcher) wait(w worker.BaseProcess) { @@ -174,7 +240,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { return } - _ = ww.stack.FindAndRemoveByPid(w.Pid()) + ww.Remove(w) err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ |