diff options
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go new file mode 100755 index 00000000..8788e509 --- /dev/null +++ b/pkg/worker_watcher/worker_watcher.go @@ -0,0 +1,310 @@ +package worker_watcher //nolint:golint,stylecheck + +import ( + "context" + "runtime" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" + syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Stack struct { + workers []worker.BaseProcess + mutex sync.RWMutex + destroy bool + actualNumOfWorkers int64 +} + +func NewWorkersStack() *Stack { + w := runtime.NumCPU() + return &Stack{ + workers: make([]worker.BaseProcess, 0, w), + actualNumOfWorkers: 0, + } +} + +func (stack *Stack) Reset() { + stack.mutex.Lock() + defer stack.mutex.Unlock() + stack.actualNumOfWorkers = 0 + stack.workers = nil +} + +// Push worker back to the stack +// If stack in destroy state, Push will provide 100ms window to unlock the mutex +func (stack *Stack) Push(w worker.BaseProcess) { + stack.mutex.Lock() + defer stack.mutex.Unlock() + stack.actualNumOfWorkers++ + stack.workers = append(stack.workers, w) +} + +func (stack *Stack) IsEmpty() bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + return len(stack.workers) == 0 +} + +func (stack *Stack) Pop() (worker.BaseProcess, bool) { + stack.mutex.Lock() + defer stack.mutex.Unlock() + + // do not release new stack + if stack.destroy { + return nil, true + } + + if len(stack.workers) == 0 { + return nil, false + } + + // move worker + w := stack.workers[len(stack.workers)-1] + stack.workers = stack.workers[:len(stack.workers)-1] + stack.actualNumOfWorkers-- + return w, false +} + +func (stack *Stack) FindAndRemoveByPid(pid int64) bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + for i := 0; i < len(stack.workers); i++ { + // worker in the stack, reallocating + if stack.workers[i].Pid() == pid { + stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) + stack.actualNumOfWorkers-- + // worker found and removed + return true + } + } + // no worker with such ID + return false +} + +// Workers return copy of the workers in the stack +func (stack *Stack) Workers() []worker.BaseProcess { + stack.mutex.Lock() + defer stack.mutex.Unlock() + workersCopy := make([]worker.BaseProcess, 0, 1) + // copy + for _, v := range stack.workers { + workersCopy = append(workersCopy, v) + } + + return workersCopy +} + +func (stack *Stack) isDestroying() bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + return stack.destroy +} + +// we also have to give a chance to pool to Push worker (return it) +func (stack *Stack) Destroy(ctx context.Context) { + stack.mutex.Lock() + stack.destroy = true + stack.mutex.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-tt.C: + stack.mutex.Lock() + // that might be one of the workers is working + if len(stack.workers) != int(stack.actualNumOfWorkers) { + stack.mutex.Unlock() + continue + } + stack.mutex.Unlock() + // unnecessary mutex, but + // just to make sure. All stack at this moment are in the stack + // Pop operation is blocked, push can't be done, since it's not possible to pop + stack.mutex.Lock() + for i := 0; i < len(stack.workers); i++ { + // set state for the stack in the stack (unused at the moment) + stack.workers[i].State().Set(internal.StateDestroyed) + } + stack.mutex.Unlock() + tt.Stop() + // clear + stack.Reset() + return + } + } +} + +// workerCreateFunc can be nil, but in that case, dead stack will not be replaced +func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { + ww := &workerWatcher{ + stack: NewWorkersStack(), + allocator: allocator, + initialNumWorkers: numWorkers, + actualNumWorkers: numWorkers, + events: events, + } + + return ww +} + +type workerWatcher struct { + mutex sync.RWMutex + stack *Stack + allocator worker.Allocator + initialNumWorkers int64 + actualNumWorkers int64 + events events.Handler +} + +func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { + for i := 0; i < len(workers); i++ { + sw, err := syncWorker.From(workers[i]) + if err != nil { + return err + } + ww.stack.Push(sw) + sw.AddListener(ww.events.Push) + + go func(swc worker.BaseProcess) { + ww.wait(swc) + }(sw) + } + return nil +} + +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) { + const op = errors.Op("GetFreeWorker") + // thread safe operation + w, stop := ww.stack.Pop() + if stop { + return nil, errors.E(op, errors.ErrWatcherStopped) + } + + // handle worker remove state + // in this state worker is destroyed by supervisor + if w != nil && w.State().Value() == internal.StateRemove { + err := ww.RemoveWorker(w) + if err != nil { + return nil, err + } + // try to get next + return ww.GetFreeWorker(ctx) + } + // no free stack + if w == nil { + for { + select { + default: + w, stop = ww.stack.Pop() + if stop { + return nil, errors.E(op, errors.ErrWatcherStopped) + } + if w == nil { + continue + } + return w, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) + } + } + } + + return w, nil +} + +func (ww *workerWatcher) AllocateNew() error { + ww.stack.mutex.Lock() + const op = errors.Op("allocate new worker") + sw, err := ww.allocator() + if err != nil { + return errors.E(op, errors.WorkerAllocate, err) + } + + ww.addToWatch(sw) + ww.stack.mutex.Unlock() + ww.PushWorker(sw) + + ww.events.Push(events.PoolEvent{ + Event: events.EventWorkerConstruct, + Payload: sw, + }) + + return nil +} + +func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { + ww.mutex.Lock() + defer ww.mutex.Unlock() + + const op = errors.Op("remove worker") + pid := wb.Pid() + + if ww.stack.FindAndRemoveByPid(pid) { + wb.State().Set(internal.StateInvalid) + err := wb.Kill() + if err != nil { + return errors.E(op, err) + } + return nil + } + + wb.State().Set(internal.StateRemove) + return nil +} + +// O(1) operation +func (ww *workerWatcher) PushWorker(w worker.BaseProcess) { + ww.mutex.Lock() + defer ww.mutex.Unlock() + ww.stack.Push(w) +} + +// Destroy all underlying stack (but let them to complete the task) +func (ww *workerWatcher) Destroy(ctx context.Context) { + // destroy stack, we don't use ww mutex here, since we should be able to push worker + ww.stack.Destroy(ctx) +} + +// Warning, this is O(n) operation, and it will return copy of the actual workers +func (ww *workerWatcher) WorkersList() []worker.BaseProcess { + return ww.stack.Workers() +} + +func (ww *workerWatcher) wait(w worker.BaseProcess) { + const op = errors.Op("process wait") + err := w.Wait() + if err != nil { + ww.events.Push(events.WorkerEvent{ + Event: events.EventWorkerError, + Worker: w, + Payload: errors.E(op, err), + }) + } + + if w.State().Value() == internal.StateDestroyed { + // worker was manually destroyed, no need to replace + return + } + + _ = ww.stack.FindAndRemoveByPid(w.Pid()) + err = ww.AllocateNew() + if err != nil { + ww.events.Push(events.PoolEvent{ + Event: events.EventPoolError, + Payload: errors.E(op, err), + }) + } +} + +func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { + ww.mutex.Lock() + defer ww.mutex.Unlock() + go func() { + ww.wait(wb) + }() +} |