diff options
Diffstat (limited to 'workers_watcher.go')
-rw-r--r-- | workers_watcher.go | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/workers_watcher.go b/workers_watcher.go new file mode 100644 index 00000000..f8522c46 --- /dev/null +++ b/workers_watcher.go @@ -0,0 +1,323 @@ +package roadrunner + +import ( + "context" + "errors" + "sync" + "time" +) + +var ErrWatcherStopped = errors.New("watcher stopped") + +type Stack struct { + workers []WorkerBase + mutex sync.RWMutex + destroy bool +} + +func NewWorkersStack() *Stack { + return &Stack{ + workers: make([]WorkerBase, 0, 12), + } +} + +func (stack *Stack) Reset() { + stack.mutex.Lock() + defer stack.mutex.Unlock() + + stack.workers = nil +} + +func (stack *Stack) Push(w WorkerBase) { + stack.mutex.Lock() + defer stack.mutex.Unlock() + stack.workers = append(stack.workers, w) +} + +func (stack *Stack) IsEmpty() bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + + return len(stack.workers) == 0 +} + +func (stack *Stack) Pop() (WorkerBase, bool) { + stack.mutex.Lock() + defer stack.mutex.Unlock() + // do not release new stack + if stack.destroy { + return nil, true + } + + if len(stack.workers) == 0 { + return nil, false + } + + w := stack.workers[len(stack.workers)-1] + stack.workers = stack.workers[:len(stack.workers)-1] + + return w, false +} + +type WorkersWatcher struct { + mutex sync.Mutex + stack *Stack + allocator func(args ...interface{}) (*SyncWorker, error) + initialNumWorkers int64 + actualNumWorkers int64 + events chan PoolEvent +} + +type WorkerWatcher interface { + // AddToWatch used to add stack to wait its state + AddToWatch(ctx context.Context, workers []WorkerBase) error + // GetFreeWorker provide first free worker + GetFreeWorker(ctx context.Context) (WorkerBase, error) + // PutWorker enqueues worker back + PushWorker(w WorkerBase) + // AllocateNew used to allocate new worker and put in into the WorkerWatcher + AllocateNew(ctx context.Context) error + // Destroy destroys the underlying stack + Destroy(ctx context.Context) + // WorkersList return all stack w/o removing it from internal storage + WorkersList(ctx context.Context) []WorkerBase + // RemoveWorker remove worker from the stack + RemoveWorker(ctx context.Context, wb WorkerBase) error +} + +// workerCreateFunc can be nil, but in that case, dead stack will not be replaced +func NewWorkerWatcher(allocator func(args ...interface{}) (*SyncWorker, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher { + // todo check if events not nil + ww := &WorkersWatcher{ + stack: NewWorkersStack(), + allocator: allocator, + initialNumWorkers: numWorkers, + actualNumWorkers: numWorkers, + events: events, + } + + return ww +} + +func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error { + for i := 0; i < len(workers); i++ { + sw, err := NewSyncWorker(workers[i]) + if err != nil { + return err + } + ww.stack.Push(sw) + go func(swc WorkerBase) { + //ww.mutex.Lock() + ww.watch(&swc) + ww.wait(ctx, &swc) + //ww.mutex.Unlock() + }(sw) + } + return nil +} + +func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { + // thread safe operation + w, stop := ww.stack.Pop() + if stop { + return nil, ErrWatcherStopped + } + // handle worker remove state + // in this state worker is destroyed by supervisor + if w != nil && w.State().Value() == StateRemove { + err := ww.RemoveWorker(ctx, w) + if err != nil { + return nil, err + } + // try to get next + return ww.GetFreeWorker(ctx) + } + // no free stack + if w == nil { + tout := time.NewTicker(time.Second * 180) + defer tout.Stop() + for { + select { + default: + w, stop = ww.stack.Pop() + if stop { + return nil, ErrWatcherStopped + } + if w == nil { + continue + } + ww.actualNumWorkers-- + return w, nil + case <-tout.C: + return nil, errors.New("no free stack") + } + } + } + ww.actualNumWorkers-- + return w, nil +} + +func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error { + ww.stack.mutex.Lock() + sw, err := ww.allocator() + if err != nil { + return err + } + ww.addToWatch(*sw) + ww.stack.mutex.Unlock() + ww.PushWorker(*sw) + return nil +} + +func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { + ww.stack.mutex.Lock() + defer ww.stack.mutex.Unlock() + pid := wb.Pid() + for i := 0; i < len(ww.stack.workers); i++ { + if ww.stack.workers[i].Pid() == pid { + // found in the stack + // remove worker + ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) + ww.decreaseNumOfActualWorkers() + + wb.State().Set(StateInvalid) + err := wb.Kill(ctx) + if err != nil { + return err + } + break + } + } + // worker currently handle request, set state Remove + wb.State().Set(StateRemove) + return nil +} + +// O(1) operation +func (ww *WorkersWatcher) PushWorker(w WorkerBase) { + ww.mutex.Lock() + ww.actualNumWorkers++ + ww.mutex.Unlock() + ww.stack.Push(w) +} + +func (ww *WorkersWatcher) ReduceWorkersCount() { + ww.mutex.Unlock() + ww.actualNumWorkers-- + ww.mutex.Lock() +} + +// Destroy all underlying stack (but let them to complete the task) +func (ww *WorkersWatcher) Destroy(ctx context.Context) { + ww.stack.mutex.Lock() + ww.stack.destroy = true + ww.stack.mutex.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-tt.C: + if len(ww.stack.workers) != int(ww.actualNumWorkers) { + continue + } + // unnecessary mutex, but + // just to make sure. All stack at this moment are in the stack + // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.stack.mutex.Lock() + for i := 0; i < len(ww.stack.workers); i++ { + // set state for the stack in the stack (unused at the moment) + ww.stack.workers[i].State().Set(StateDestroyed) + } + ww.stack.mutex.Unlock() + tt.Stop() + // clear + ww.stack.Reset() + return + } + } +} + +// Warning, this is O(n) operation +func (ww *WorkersWatcher) WorkersList(ctx context.Context) []WorkerBase { + return ww.stack.workers +} + +func (ww *WorkersWatcher) wait(ctx context.Context, w *WorkerBase) { + err := (*w).Wait(ctx) + if err != nil { + ww.events <- PoolEvent{Payload: WorkerEvent{ + Event: EventWorkerError, + Worker: *w, + Payload: err, + }} + } + // If not destroyed, reallocate + if (*w).State().Value() != StateDestroyed { + pid := (*w).Pid() + for i := 0; i < len(ww.stack.workers); i++ { + // worker in the stack, reallocating + if ww.stack.workers[i].Pid() == pid { + ww.stack.mutex.Lock() + ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) + + ww.decreaseNumOfActualWorkers() + + ww.stack.mutex.Unlock() + err = ww.AllocateNew(ctx) + if err != nil { + ww.events <- PoolEvent{Payload: WorkerEvent{ + Event: EventWorkerError, + Worker: *w, + Payload: err, + }} + return + } + return + } + } + // worker not in the stack (not returned), forget and allocate new + err = ww.AllocateNew(ctx) + if err != nil { + ww.events <- PoolEvent{Payload: WorkerEvent{ + Event: EventWorkerError, + Worker: *w, + Payload: err, + }} + return + } + } + return +} + +func (ww *WorkersWatcher) addToWatch(wb WorkerBase) { + ww.mutex.Lock() + defer ww.mutex.Unlock() + go func() { + ww.wait(context.Background(), &wb) + }() +} + +func (ww *WorkersWatcher) reallocate(wb *WorkerBase) error { + sw, err := ww.allocator() + if err != nil { + return err + } + *wb = *sw + return nil +} + +func (ww *WorkersWatcher) decreaseNumOfActualWorkers() { + ww.mutex.Lock() + ww.actualNumWorkers-- + ww.mutex.Unlock() +} + +func (ww *WorkersWatcher) watch(swc *WorkerBase) { + // todo make event to stop function + go func() { + select { + case ev := <-(*swc).Events(): + ww.events <- PoolEvent{Payload: ev} + } + }() +} |