diff options
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 228 |
1 files changed, 162 insertions, 66 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index cf2e1eb7..804e4658 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,132 +3,228 @@ package worker_watcher //nolint:golint,stylecheck import ( "context" "sync" + "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) -// workerCreateFunc can be nil, but in that case, dead stack will not be replaced +// workerCreateFunc can be nil, but in that case, dead container will not be replaced func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ - stack: NewWorkersStack(numWorkers), - allocator: allocator, - events: events, + container: container.NewVector(numWorkers), + numWorkers: numWorkers, + workers: make([]worker.BaseProcess, 0, numWorkers), + allocator: allocator, + events: events, } return ww } type workerWatcher struct { - mutex sync.RWMutex - stack *Stack - allocator worker.Allocator - events events.Handler + sync.RWMutex + container container.Vector + // used to control the Destroy stage (that all workers are in the container) + numWorkers uint64 + workers []worker.BaseProcess + allocator worker.Allocator + events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { +func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - ww.stack.Push(workers[i]) + ww.container.Enqueue(workers[i]) + // add worker to watch slice + ww.workers = append(ww.workers, workers[i]) - go func(swc worker.SyncWorker) { + go func(swc worker.BaseProcess) { ww.wait(swc) }(workers[i]) } return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) { +// return value from Get +type get struct { + w worker.BaseProcess + err error +} + +// Get is not a thread safe operation +func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { + c := make(chan get, 1) const op = errors.Op("worker_watcher_get_free_worker") - // thread safe operation - w, stop := ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } + go func() { + // FAST PATH + // thread safe operation + w, stop := ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), + } + return + } - // handle worker remove state - // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == states.StateRemove { - err := ww.RemoveWorker(w) - if err != nil { - return nil, err + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + c <- get{ + w, + nil, + } + return } - // try to get next - return ww.GetFreeWorker(ctx) - } - // no free stack - if w == nil { + // ========================================================= + // SLOW PATH + _ = w.Kill() + // no free workers in the container + // try to continuously get free one for { select { default: - w, stop = ww.stack.Pop() + w, stop = ww.container.Dequeue() if stop { - return nil, errors.E(op, errors.WatcherStopped) + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), + } } - if w == nil { + + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + c <- get{ + w, + nil, + } + return + case worker.StateWorking: // how?? + ww.container.Enqueue(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker continue } - return w, nil - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } } - } + }() - return w, nil + select { + case r := <-c: + if r.err != nil { + return nil, r.err + } + return r.w, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed")) + } } -func (ww *workerWatcher) AllocateNew() error { - ww.stack.mutex.Lock() +func (ww *workerWatcher) Allocate() error { + ww.Lock() const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() if err != nil { return errors.E(op, errors.WorkerAllocate, err) } + // add worker to Wait ww.addToWatch(sw) - ww.stack.mutex.Unlock() - ww.PushWorker(sw) + // add new worker to the workers slice (to get information about workers in parallel) + ww.workers = append(ww.workers, sw) + + // unlock Allocate mutex + ww.Unlock() + // push the worker to the container + ww.Push(sw) return nil } -func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { - ww.mutex.Lock() - defer ww.mutex.Unlock() +// Remove +func (ww *workerWatcher) Remove(wb worker.BaseProcess) { + ww.Lock() + defer ww.Unlock() - const op = errors.Op("worker_watcher_remove_worker") + // set remove state pid := wb.Pid() - if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(states.StateRemove) - err := wb.Kill() - if err != nil { - return errors.E(op, err) + // worker will be removed on the Get operation + for i := 0; i < len(ww.workers); i++ { + if ww.workers[i].Pid() == pid { + ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) + // kill worker + _ = wb.Kill() + return } - return nil } - - return nil } // O(1) operation -func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { - ww.mutex.Lock() - defer ww.mutex.Unlock() - ww.stack.Push(w) +func (ww *workerWatcher) Push(w worker.BaseProcess) { + ww.container.Enqueue(w) } -// Destroy all underlying stack (but let them to complete the task) +// Destroy all underlying container (but let them to complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { - // destroy stack, we don't use ww mutex here, since we should be able to push worker - ww.stack.Destroy(ctx) + // destroy container, we don't use ww mutex here, since we should be able to push worker + ww.Lock() + // do not release new workers + ww.container.Destroy() + ww.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + defer tt.Stop() + for { + select { + case <-tt.C: + ww.Lock() + // that might be one of the workers is working + if ww.numWorkers != uint64(len(ww.workers)) { + ww.Unlock() + continue + } + ww.Unlock() + // unnecessary mutex, but + // just to make sure. All container at this moment are in the container + // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.Lock() + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + return + } + } } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []worker.SyncWorker { - return ww.stack.Workers() +func (ww *workerWatcher) List() []worker.BaseProcess { + ww.Lock() + defer ww.Unlock() + + base := make([]worker.BaseProcess, 0, len(ww.workers)) + for i := 0; i < len(ww.workers); i++ { + base = append(base, ww.workers[i]) + } + + return base } func (ww *workerWatcher) wait(w worker.BaseProcess) { @@ -142,14 +238,14 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } - _ = ww.stack.FindAndRemoveByPid(w.Pid()) - err = ww.AllocateNew() + ww.Remove(w) + err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ Event: events.EventPoolError, @@ -158,7 +254,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { } } -func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { +func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { go func() { ww.wait(wb) }() |