diff options
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 13 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack.go (renamed from pkg/worker_watcher/stack.go) | 79 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack_test.go (renamed from pkg/worker_watcher/stack_test.go) | 5 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 45 | ||||
-rw-r--r-- | pkg/worker_watcher/interface.go | 27 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 228 |
6 files changed, 277 insertions, 120 deletions
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go new file mode 100644 index 00000000..532bace9 --- /dev/null +++ b/pkg/worker_watcher/container/interface.go @@ -0,0 +1,13 @@ +package container + +import "github.com/spiral/roadrunner/v2/pkg/worker" + +// Vector interface represents vector container +type Vector interface { + // Enqueue used to put worker to the vector + Enqueue(worker.BaseProcess) + // Dequeue used to get worker from the vector + Dequeue() (worker.BaseProcess, bool) + // Destroy used to stop releasing the workers + Destroy() +} diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/container/stack.go index 55034e41..fb8ecd3b 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/container/stack.go @@ -1,17 +1,17 @@ -package worker_watcher //nolint:golint,stylecheck +package container + import ( "context" "runtime" "sync" "time" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" ) type Stack struct { - workers []*worker.SyncWorkerImpl - mutex sync.RWMutex + sync.RWMutex + workers []worker.BaseProcess destroy bool actualNumOfWorkers uint64 initialNumOfWorkers uint64 @@ -20,39 +20,39 @@ type Stack struct { func NewWorkersStack(initialNumOfWorkers uint64) *Stack { w := runtime.NumCPU() return &Stack{ - workers: make([]*worker.SyncWorkerImpl, 0, w), + workers: make([]worker.BaseProcess, 0, w), actualNumOfWorkers: 0, initialNumOfWorkers: initialNumOfWorkers, } } func (stack *Stack) Reset() { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() stack.actualNumOfWorkers = 0 stack.workers = nil } -// Push worker back to the stack -// If stack in destroy state, Push will provide 100ms window to unlock the mutex +// Push worker back to the vec +// If vec in destroy state, Push will provide 100ms window to unlock the mutex func (stack *Stack) Push(w worker.BaseProcess) { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl)) + stack.workers = append(stack.workers, w) } func (stack *Stack) IsEmpty() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() return len(stack.workers) == 0 } -func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { - stack.mutex.Lock() - defer stack.mutex.Unlock() +func (stack *Stack) Pop() (worker.BaseProcess, bool) { + stack.Lock() + defer stack.Unlock() - // do not release new stack + // do not release new vec if stack.destroy { return nil, true } @@ -69,10 +69,10 @@ func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { } func (stack *Stack) FindAndRemoveByPid(pid int64) bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() for i := 0; i < len(stack.workers); i++ { - // worker in the stack, reallocating + // worker in the vec, reallocating if stack.workers[i].Pid() == pid { stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) stack.actualNumOfWorkers-- @@ -84,12 +84,13 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { return false } -// Workers return copy of the workers in the stack -func (stack *Stack) Workers() []worker.SyncWorker { - stack.mutex.Lock() - defer stack.mutex.Unlock() - workersCopy := make([]worker.SyncWorker, 0, 1) +// Workers return copy of the workers in the vec +func (stack *Stack) Workers() []worker.BaseProcess { + stack.Lock() + defer stack.Unlock() + workersCopy := make([]worker.BaseProcess, 0, 1) // copy + // TODO pointers, copy have no sense for _, v := range stack.workers { if v != nil { workersCopy = append(workersCopy, v) @@ -100,40 +101,40 @@ func (stack *Stack) Workers() []worker.SyncWorker { } func (stack *Stack) isDestroying() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() return stack.destroy } // we also have to give a chance to pool to Push worker (return it) -func (stack *Stack) Destroy(ctx context.Context) { - stack.mutex.Lock() +func (stack *Stack) Destroy(_ context.Context) { + stack.Lock() stack.destroy = true - stack.mutex.Unlock() + stack.Unlock() tt := time.NewTicker(time.Millisecond * 500) defer tt.Stop() for { select { case <-tt.C: - stack.mutex.Lock() + stack.Lock() // that might be one of the workers is working if stack.initialNumOfWorkers != stack.actualNumOfWorkers { - stack.mutex.Unlock() + stack.Unlock() continue } - stack.mutex.Unlock() + stack.Unlock() // unnecessary mutex, but - // just to make sure. All stack at this moment are in the stack + // just to make sure. All vec at this moment are in the vec // Pop operation is blocked, push can't be done, since it's not possible to pop - stack.mutex.Lock() + stack.Lock() for i := 0; i < len(stack.workers); i++ { - // set state for the stack in the stack (unused at the moment) - stack.workers[i].State().Set(states.StateDestroyed) + // set state for the vec in the vec (unused at the moment) + stack.workers[i].State().Set(worker.StateDestroyed) // kill the worker _ = stack.workers[i].Kill() } - stack.mutex.Unlock() + stack.Unlock() // clear stack.Reset() return diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/container/stack_test.go index 5287a6dc..d699664c 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/container/stack_test.go @@ -1,4 +1,5 @@ -package worker_watcher //nolint:golint,stylecheck +package container + import ( "context" "os/exec" @@ -12,7 +13,7 @@ import ( func TestNewWorkersStack(t *testing.T) { stack := NewWorkersStack(0) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) - assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers) + assert.Equal(t, []worker.BaseProcess{}, stack.workers) } func TestStack_Push(t *testing.T) { diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go new file mode 100644 index 00000000..239b01c7 --- /dev/null +++ b/pkg/worker_watcher/container/vec.go @@ -0,0 +1,45 @@ +package container + +import ( + "sync/atomic" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Vec struct { + destroy uint64 + workers chan worker.BaseProcess +} + +func NewVector(initialNumOfWorkers uint64) Vector { + vec := &Vec{ + destroy: 0, + workers: make(chan worker.BaseProcess, initialNumOfWorkers), + } + + return vec +} + +func (v *Vec) Enqueue(w worker.BaseProcess) { + v.workers <- w +} + +func (v *Vec) Dequeue() (worker.BaseProcess, bool) { + /* + if *addr == old { + *addr = new + return true + } + */ + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + return nil, true + } + + w := <-v.workers + + return w, false +} + +func (v *Vec) Destroy() { + atomic.StoreUint64(&v.destroy, 1) +} diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go index 927aa270..4625b7a7 100644 --- a/pkg/worker_watcher/interface.go +++ b/pkg/worker_watcher/interface.go @@ -6,25 +6,26 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker" ) +// Watcher is an interface for the Sync workers lifecycle type Watcher interface { - // AddToWatch used to add stack to wait its state - AddToWatch(workers []worker.SyncWorker) error + // Watch used to add workers to the container + Watch(workers []worker.BaseProcess) error - // GetFreeWorker provide first free worker - GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) + // Get provide first free worker + Get(ctx context.Context) (worker.BaseProcess, error) - // PutWorker enqueues worker back - PushWorker(w worker.SyncWorker) + // Push enqueues worker back + Push(w worker.BaseProcess) - // AllocateNew used to allocate new worker and put in into the WorkerWatcher - AllocateNew() error + // Allocate - allocates new worker and put it into the WorkerWatcher + Allocate() error - // Destroy destroys the underlying stack + // Destroy destroys the underlying container Destroy(ctx context.Context) - // WorkersList return all stack w/o removing it from internal storage - WorkersList() []worker.SyncWorker + // WorkersList return all container w/o removing it from internal storage + List() []worker.BaseProcess - // RemoveWorker remove worker from the stack - RemoveWorker(wb worker.SyncWorker) error + // RemoveWorker remove worker from the container + Remove(wb worker.BaseProcess) } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index cf2e1eb7..804e4658 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,132 +3,228 @@ package worker_watcher //nolint:golint,stylecheck import ( "context" "sync" + "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) -// workerCreateFunc can be nil, but in that case, dead stack will not be replaced +// workerCreateFunc can be nil, but in that case, dead container will not be replaced func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ - stack: NewWorkersStack(numWorkers), - allocator: allocator, - events: events, + container: container.NewVector(numWorkers), + numWorkers: numWorkers, + workers: make([]worker.BaseProcess, 0, numWorkers), + allocator: allocator, + events: events, } return ww } type workerWatcher struct { - mutex sync.RWMutex - stack *Stack - allocator worker.Allocator - events events.Handler + sync.RWMutex + container container.Vector + // used to control the Destroy stage (that all workers are in the container) + numWorkers uint64 + workers []worker.BaseProcess + allocator worker.Allocator + events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { +func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - ww.stack.Push(workers[i]) + ww.container.Enqueue(workers[i]) + // add worker to watch slice + ww.workers = append(ww.workers, workers[i]) - go func(swc worker.SyncWorker) { + go func(swc worker.BaseProcess) { ww.wait(swc) }(workers[i]) } return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) { +// return value from Get +type get struct { + w worker.BaseProcess + err error +} + +// Get is not a thread safe operation +func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { + c := make(chan get, 1) const op = errors.Op("worker_watcher_get_free_worker") - // thread safe operation - w, stop := ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } + go func() { + // FAST PATH + // thread safe operation + w, stop := ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), + } + return + } - // handle worker remove state - // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == states.StateRemove { - err := ww.RemoveWorker(w) - if err != nil { - return nil, err + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + c <- get{ + w, + nil, + } + return } - // try to get next - return ww.GetFreeWorker(ctx) - } - // no free stack - if w == nil { + // ========================================================= + // SLOW PATH + _ = w.Kill() + // no free workers in the container + // try to continuously get free one for { select { default: - w, stop = ww.stack.Pop() + w, stop = ww.container.Dequeue() if stop { - return nil, errors.E(op, errors.WatcherStopped) + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), + } } - if w == nil { + + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + c <- get{ + w, + nil, + } + return + case worker.StateWorking: // how?? + ww.container.Enqueue(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker continue } - return w, nil - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } } - } + }() - return w, nil + select { + case r := <-c: + if r.err != nil { + return nil, r.err + } + return r.w, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed")) + } } -func (ww *workerWatcher) AllocateNew() error { - ww.stack.mutex.Lock() +func (ww *workerWatcher) Allocate() error { + ww.Lock() const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() if err != nil { return errors.E(op, errors.WorkerAllocate, err) } + // add worker to Wait ww.addToWatch(sw) - ww.stack.mutex.Unlock() - ww.PushWorker(sw) + // add new worker to the workers slice (to get information about workers in parallel) + ww.workers = append(ww.workers, sw) + + // unlock Allocate mutex + ww.Unlock() + // push the worker to the container + ww.Push(sw) return nil } -func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { - ww.mutex.Lock() - defer ww.mutex.Unlock() +// Remove +func (ww *workerWatcher) Remove(wb worker.BaseProcess) { + ww.Lock() + defer ww.Unlock() - const op = errors.Op("worker_watcher_remove_worker") + // set remove state pid := wb.Pid() - if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(states.StateRemove) - err := wb.Kill() - if err != nil { - return errors.E(op, err) + // worker will be removed on the Get operation + for i := 0; i < len(ww.workers); i++ { + if ww.workers[i].Pid() == pid { + ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) + // kill worker + _ = wb.Kill() + return } - return nil } - - return nil } // O(1) operation -func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { - ww.mutex.Lock() - defer ww.mutex.Unlock() - ww.stack.Push(w) +func (ww *workerWatcher) Push(w worker.BaseProcess) { + ww.container.Enqueue(w) } -// Destroy all underlying stack (but let them to complete the task) +// Destroy all underlying container (but let them to complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { - // destroy stack, we don't use ww mutex here, since we should be able to push worker - ww.stack.Destroy(ctx) + // destroy container, we don't use ww mutex here, since we should be able to push worker + ww.Lock() + // do not release new workers + ww.container.Destroy() + ww.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + defer tt.Stop() + for { + select { + case <-tt.C: + ww.Lock() + // that might be one of the workers is working + if ww.numWorkers != uint64(len(ww.workers)) { + ww.Unlock() + continue + } + ww.Unlock() + // unnecessary mutex, but + // just to make sure. All container at this moment are in the container + // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.Lock() + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + return + } + } } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []worker.SyncWorker { - return ww.stack.Workers() +func (ww *workerWatcher) List() []worker.BaseProcess { + ww.Lock() + defer ww.Unlock() + + base := make([]worker.BaseProcess, 0, len(ww.workers)) + for i := 0; i < len(ww.workers); i++ { + base = append(base, ww.workers[i]) + } + + return base } func (ww *workerWatcher) wait(w worker.BaseProcess) { @@ -142,14 +238,14 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } - _ = ww.stack.FindAndRemoveByPid(w.Pid()) - err = ww.AllocateNew() + ww.Remove(w) + err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ Event: events.EventPoolError, @@ -158,7 +254,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { } } -func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { +func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { go func() { ww.wait(wb) }() |