summaryrefslogtreecommitdiff
path: root/workers_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'workers_watcher.go')
-rw-r--r--workers_watcher.go323
1 files changed, 323 insertions, 0 deletions
diff --git a/workers_watcher.go b/workers_watcher.go
new file mode 100644
index 00000000..f8522c46
--- /dev/null
+++ b/workers_watcher.go
@@ -0,0 +1,323 @@
+package roadrunner
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "time"
+)
+
+var ErrWatcherStopped = errors.New("watcher stopped")
+
+type Stack struct {
+ workers []WorkerBase
+ mutex sync.RWMutex
+ destroy bool
+}
+
+func NewWorkersStack() *Stack {
+ return &Stack{
+ workers: make([]WorkerBase, 0, 12),
+ }
+}
+
+func (stack *Stack) Reset() {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+
+ stack.workers = nil
+}
+
+func (stack *Stack) Push(w WorkerBase) {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ stack.workers = append(stack.workers, w)
+}
+
+func (stack *Stack) IsEmpty() bool {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+
+ return len(stack.workers) == 0
+}
+
+func (stack *Stack) Pop() (WorkerBase, bool) {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ // do not release new stack
+ if stack.destroy {
+ return nil, true
+ }
+
+ if len(stack.workers) == 0 {
+ return nil, false
+ }
+
+ w := stack.workers[len(stack.workers)-1]
+ stack.workers = stack.workers[:len(stack.workers)-1]
+
+ return w, false
+}
+
+type WorkersWatcher struct {
+ mutex sync.Mutex
+ stack *Stack
+ allocator func(args ...interface{}) (*SyncWorker, error)
+ initialNumWorkers int64
+ actualNumWorkers int64
+ events chan PoolEvent
+}
+
+type WorkerWatcher interface {
+ // AddToWatch used to add stack to wait its state
+ AddToWatch(ctx context.Context, workers []WorkerBase) error
+ // GetFreeWorker provide first free worker
+ GetFreeWorker(ctx context.Context) (WorkerBase, error)
+ // PutWorker enqueues worker back
+ PushWorker(w WorkerBase)
+ // AllocateNew used to allocate new worker and put in into the WorkerWatcher
+ AllocateNew(ctx context.Context) error
+ // Destroy destroys the underlying stack
+ Destroy(ctx context.Context)
+ // WorkersList return all stack w/o removing it from internal storage
+ WorkersList(ctx context.Context) []WorkerBase
+ // RemoveWorker remove worker from the stack
+ RemoveWorker(ctx context.Context, wb WorkerBase) error
+}
+
+// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
+func NewWorkerWatcher(allocator func(args ...interface{}) (*SyncWorker, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher {
+ // todo check if events not nil
+ ww := &WorkersWatcher{
+ stack: NewWorkersStack(),
+ allocator: allocator,
+ initialNumWorkers: numWorkers,
+ actualNumWorkers: numWorkers,
+ events: events,
+ }
+
+ return ww
+}
+
+func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error {
+ for i := 0; i < len(workers); i++ {
+ sw, err := NewSyncWorker(workers[i])
+ if err != nil {
+ return err
+ }
+ ww.stack.Push(sw)
+ go func(swc WorkerBase) {
+ //ww.mutex.Lock()
+ ww.watch(&swc)
+ ww.wait(ctx, &swc)
+ //ww.mutex.Unlock()
+ }(sw)
+ }
+ return nil
+}
+
+func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
+ // thread safe operation
+ w, stop := ww.stack.Pop()
+ if stop {
+ return nil, ErrWatcherStopped
+ }
+ // handle worker remove state
+ // in this state worker is destroyed by supervisor
+ if w != nil && w.State().Value() == StateRemove {
+ err := ww.RemoveWorker(ctx, w)
+ if err != nil {
+ return nil, err
+ }
+ // try to get next
+ return ww.GetFreeWorker(ctx)
+ }
+ // no free stack
+ if w == nil {
+ tout := time.NewTicker(time.Second * 180)
+ defer tout.Stop()
+ for {
+ select {
+ default:
+ w, stop = ww.stack.Pop()
+ if stop {
+ return nil, ErrWatcherStopped
+ }
+ if w == nil {
+ continue
+ }
+ ww.actualNumWorkers--
+ return w, nil
+ case <-tout.C:
+ return nil, errors.New("no free stack")
+ }
+ }
+ }
+ ww.actualNumWorkers--
+ return w, nil
+}
+
+func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error {
+ ww.stack.mutex.Lock()
+ sw, err := ww.allocator()
+ if err != nil {
+ return err
+ }
+ ww.addToWatch(*sw)
+ ww.stack.mutex.Unlock()
+ ww.PushWorker(*sw)
+ return nil
+}
+
+func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error {
+ ww.stack.mutex.Lock()
+ defer ww.stack.mutex.Unlock()
+ pid := wb.Pid()
+ for i := 0; i < len(ww.stack.workers); i++ {
+ if ww.stack.workers[i].Pid() == pid {
+ // found in the stack
+ // remove worker
+ ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
+ ww.decreaseNumOfActualWorkers()
+
+ wb.State().Set(StateInvalid)
+ err := wb.Kill(ctx)
+ if err != nil {
+ return err
+ }
+ break
+ }
+ }
+ // worker currently handle request, set state Remove
+ wb.State().Set(StateRemove)
+ return nil
+}
+
+// O(1) operation
+func (ww *WorkersWatcher) PushWorker(w WorkerBase) {
+ ww.mutex.Lock()
+ ww.actualNumWorkers++
+ ww.mutex.Unlock()
+ ww.stack.Push(w)
+}
+
+func (ww *WorkersWatcher) ReduceWorkersCount() {
+ ww.mutex.Unlock()
+ ww.actualNumWorkers--
+ ww.mutex.Lock()
+}
+
+// Destroy all underlying stack (but let them to complete the task)
+func (ww *WorkersWatcher) Destroy(ctx context.Context) {
+ ww.stack.mutex.Lock()
+ ww.stack.destroy = true
+ ww.stack.mutex.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 100)
+ for {
+ select {
+ case <-tt.C:
+ if len(ww.stack.workers) != int(ww.actualNumWorkers) {
+ continue
+ }
+ // unnecessary mutex, but
+ // just to make sure. All stack at this moment are in the stack
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ ww.stack.mutex.Lock()
+ for i := 0; i < len(ww.stack.workers); i++ {
+ // set state for the stack in the stack (unused at the moment)
+ ww.stack.workers[i].State().Set(StateDestroyed)
+ }
+ ww.stack.mutex.Unlock()
+ tt.Stop()
+ // clear
+ ww.stack.Reset()
+ return
+ }
+ }
+}
+
+// Warning, this is O(n) operation
+func (ww *WorkersWatcher) WorkersList(ctx context.Context) []WorkerBase {
+ return ww.stack.workers
+}
+
+func (ww *WorkersWatcher) wait(ctx context.Context, w *WorkerBase) {
+ err := (*w).Wait(ctx)
+ if err != nil {
+ ww.events <- PoolEvent{Payload: WorkerEvent{
+ Event: EventWorkerError,
+ Worker: *w,
+ Payload: err,
+ }}
+ }
+ // If not destroyed, reallocate
+ if (*w).State().Value() != StateDestroyed {
+ pid := (*w).Pid()
+ for i := 0; i < len(ww.stack.workers); i++ {
+ // worker in the stack, reallocating
+ if ww.stack.workers[i].Pid() == pid {
+ ww.stack.mutex.Lock()
+ ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
+
+ ww.decreaseNumOfActualWorkers()
+
+ ww.stack.mutex.Unlock()
+ err = ww.AllocateNew(ctx)
+ if err != nil {
+ ww.events <- PoolEvent{Payload: WorkerEvent{
+ Event: EventWorkerError,
+ Worker: *w,
+ Payload: err,
+ }}
+ return
+ }
+ return
+ }
+ }
+ // worker not in the stack (not returned), forget and allocate new
+ err = ww.AllocateNew(ctx)
+ if err != nil {
+ ww.events <- PoolEvent{Payload: WorkerEvent{
+ Event: EventWorkerError,
+ Worker: *w,
+ Payload: err,
+ }}
+ return
+ }
+ }
+ return
+}
+
+func (ww *WorkersWatcher) addToWatch(wb WorkerBase) {
+ ww.mutex.Lock()
+ defer ww.mutex.Unlock()
+ go func() {
+ ww.wait(context.Background(), &wb)
+ }()
+}
+
+func (ww *WorkersWatcher) reallocate(wb *WorkerBase) error {
+ sw, err := ww.allocator()
+ if err != nil {
+ return err
+ }
+ *wb = *sw
+ return nil
+}
+
+func (ww *WorkersWatcher) decreaseNumOfActualWorkers() {
+ ww.mutex.Lock()
+ ww.actualNumWorkers--
+ ww.mutex.Unlock()
+}
+
+func (ww *WorkersWatcher) watch(swc *WorkerBase) {
+ // todo make event to stop function
+ go func() {
+ select {
+ case ev := <-(*swc).Events():
+ ww.events <- PoolEvent{Payload: ev}
+ }
+ }()
+}