diff options
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 107 | ||||
-rw-r--r-- | pkg/worker_watcher/container/queue/queue.go | 102 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 318 |
3 files changed, 0 insertions, 527 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go deleted file mode 100644 index 5605f1e0..00000000 --- a/pkg/worker_watcher/container/channel/vec.go +++ /dev/null @@ -1,107 +0,0 @@ -package channel - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -type Vec struct { - sync.RWMutex - // destroy signal - destroy uint64 - // channel with the workers - workers chan worker.BaseProcess -} - -func NewVector(len uint64) *Vec { - vec := &Vec{ - destroy: 0, - workers: make(chan worker.BaseProcess, len), - } - - return vec -} - -// Push is O(1) operation -// In case of TTL and full channel O(n) worst case, where n is len of the channel -func (v *Vec) Push(w worker.BaseProcess) { - // Non-blocking channel send - select { - case v.workers <- w: - // default select branch is only possible when dealing with TTL - // because in that case, workers in the v.workers channel can be TTL-ed and killed - // but presenting in the channel - default: - // Stop Pop operations - v.Lock() - defer v.Unlock() - - /* - we can be in the default branch by the following reasons: - 1. TTL is set with no requests during the TTL - 2. Violated Get <-> Release operation (how ??) - */ - for i := 0; i < len(v.workers); i++ { - /* - We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. - */ - wrk := <-v.workers - switch wrk.State().Value() { - // skip good states, put worker back - case worker.StateWorking, worker.StateReady: - // put the worker back - // generally, while send and receive operations are concurrent (from the channel), channel behave - // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO - v.workers <- wrk - continue - /* - Bad states are here. - */ - default: - // kill the current worker (just to be sure it's dead) - if wrk != nil { - _ = wrk.Kill() - } - // replace with the new one and return from the loop - // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker - // But this case will be handled in the worker_watcher::Get - v.workers <- w - return - } - } - } -} - -func (v *Vec) Remove(_ int64) {} - -func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { - /* - if *addr == old { - *addr = new - return true - } - */ - - if atomic.LoadUint64(&v.destroy) == 1 { - return nil, errors.E(errors.WatcherStopped) - } - - // used only for the TTL-ed workers - v.RLock() - defer v.RUnlock() - - select { - case w := <-v.workers: - return w, nil - case <-ctx.Done(): - return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) - } -} - -func (v *Vec) Destroy() { - atomic.StoreUint64(&v.destroy, 1) -} diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go deleted file mode 100644 index edf81d60..00000000 --- a/pkg/worker_watcher/container/queue/queue.go +++ /dev/null @@ -1,102 +0,0 @@ -package queue - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -const ( - initialSize = 1 - maxInitialSize = 8 - maxInternalSliceSize = 10 -) - -type Node struct { - w []worker.BaseProcess - // LL - n *Node -} - -type Queue struct { - mu sync.Mutex - - head *Node - tail *Node - - curr uint64 - len uint64 - - sliceSize uint64 -} - -func NewQueue() *Queue { - q := &Queue{ - mu: sync.Mutex{}, - head: nil, - tail: nil, - curr: 0, - len: 0, - sliceSize: 0, - } - - return q -} - -func (q *Queue) Push(w worker.BaseProcess) { - q.mu.Lock() - - if q.head == nil { - h := newNode(initialSize) - q.head = h - q.tail = h - q.sliceSize = maxInitialSize - } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) { - n := newNode(maxInternalSliceSize) - q.tail.n = n - q.tail = n - q.sliceSize = maxInternalSliceSize - } - - q.tail.w = append(q.tail.w, w) - - atomic.AddUint64(&q.len, 1) - - q.mu.Unlock() -} - -func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) { - q.mu.Lock() - - if q.head == nil { - return nil, nil - } - - w := q.head.w[q.curr] - q.head.w[q.curr] = nil - atomic.AddUint64(&q.len, ^uint64(0)) - atomic.AddUint64(&q.curr, 1) - - if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) { - n := q.head.n - q.head.n = nil - q.head = n - q.curr = 0 - } - - q.mu.Unlock() - - return w, nil -} - -func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) { - -} - -func (q *Queue) Destroy() {} - -func newNode(capacity int) *Node { - return &Node{w: make([]worker.BaseProcess, 0, capacity)} -} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go deleted file mode 100755 index 83f8e627..00000000 --- a/pkg/worker_watcher/worker_watcher.go +++ /dev/null @@ -1,318 +0,0 @@ -package worker_watcher //nolint:stylecheck - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" - "github.com/spiral/roadrunner/v2/utils" -) - -// Vector interface represents vector container -type Vector interface { - // Push used to put worker to the vector - Push(worker.BaseProcess) - // Pop used to get worker from the vector - Pop(ctx context.Context) (worker.BaseProcess, error) - // Remove worker with provided pid - Remove(pid int64) - // Destroy used to stop releasing the workers - Destroy() - - // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation - // Replace(prevPid int64, newWorker worker.BaseProcess) -} - -type workerWatcher struct { - sync.RWMutex - container Vector - // used to control Destroy stage (that all workers are in the container) - numWorkers *uint64 - - workers []worker.BaseProcess - - allocator worker.Allocator - allocateTimeout time.Duration - events events.Handler -} - -// NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { - ww := &workerWatcher{ - container: channel.NewVector(numWorkers), - - // pass a ptr to the number of workers to avoid blocking in the TTL loop - numWorkers: utils.Uint64(numWorkers), - allocateTimeout: allocateTimeout, - workers: make([]worker.BaseProcess, 0, numWorkers), - - allocator: allocator, - events: events, - } - - return ww -} - -func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { - for i := 0; i < len(workers); i++ { - ww.container.Push(workers[i]) - // add worker to watch slice - ww.workers = append(ww.workers, workers[i]) - - go func(swc worker.BaseProcess) { - ww.wait(swc) - }(workers[i]) - } - return nil -} - -// Take is not a thread safe operation -func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { - const op = errors.Op("worker_watcher_get_free_worker") - - // thread safe operation - w, err := ww.container.Pop(ctx) - if err != nil { - if errors.Is(errors.WatcherStopped, err) { - return nil, errors.E(op, errors.WatcherStopped) - } - - return nil, errors.E(op, err) - } - - // fast path, worker not nil and in the ReadyState - if w.State().Value() == worker.StateReady { - return w, nil - } - - // ========================================================= - // SLOW PATH - _ = w.Kill() - // no free workers in the container or worker not in the ReadyState (TTL-ed) - // try to continuously get free one - for { - w, err = ww.container.Pop(ctx) - if err != nil { - if errors.Is(errors.WatcherStopped, err) { - return nil, errors.E(op, errors.WatcherStopped) - } - return nil, errors.E(op, err) - } - - if err != nil { - return nil, errors.E(op, err) - } - - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - return w, nil - case worker.StateWorking: // how?? - ww.container.Push(w) // put it back, let worker finish the work - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateStopping: - // worker doing no work because it in the container - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue - } - } -} - -func (ww *workerWatcher) Allocate() error { - const op = errors.Op("worker_watcher_allocate_new") - - sw, err := ww.allocator() - if err != nil { - // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), - }) - - // if no timeout, return error immediately - if ww.allocateTimeout == 0 { - return errors.E(op, errors.WorkerAllocate, err) - } - - // every half of a second - allocateFreq := time.NewTicker(time.Millisecond * 500) - - tt := time.After(ww.allocateTimeout) - for { - select { - case <-tt: - // reduce number of workers - atomic.AddUint64(ww.numWorkers, ^uint64(0)) - allocateFreq.Stop() - // timeout exceed, worker can't be allocated - return errors.E(op, errors.WorkerAllocate, err) - - case <-allocateFreq.C: - sw, err = ww.allocator() - if err != nil { - // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), - }) - continue - } - - // reallocated - allocateFreq.Stop() - goto done - } - } - } - -done: - // add worker to Wait - ww.addToWatch(sw) - - ww.Lock() - // add new worker to the workers slice (to get information about workers in parallel) - ww.workers = append(ww.workers, sw) - ww.Unlock() - - // push the worker to the container - ww.Release(sw) - return nil -} - -// Remove worker -func (ww *workerWatcher) Remove(wb worker.BaseProcess) { - ww.Lock() - defer ww.Unlock() - - // set remove state - pid := wb.Pid() - - // worker will be removed on the Get operation - for i := 0; i < len(ww.workers); i++ { - if ww.workers[i].Pid() == pid { - ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) - // kill worker, just to be sure it's dead - _ = wb.Kill() - return - } - } -} - -// Release O(1) operation -func (ww *workerWatcher) Release(w worker.BaseProcess) { - switch w.State().Value() { - case worker.StateReady: - ww.container.Push(w) - default: - _ = w.Kill() - } -} - -// Destroy all underlying container (but let them complete the task) -func (ww *workerWatcher) Destroy(_ context.Context) { - // destroy container, we don't use ww mutex here, since we should be able to push worker - ww.Lock() - // do not release new workers - ww.container.Destroy() - ww.Unlock() - - tt := time.NewTicker(time.Millisecond * 100) - defer tt.Stop() - for { //nolint:gosimple - select { - case <-tt.C: - ww.Lock() - // that might be one of the workers is working - if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { - ww.Unlock() - continue - } - // All container at this moment are in the container - // Pop operation is blocked, push can't be done, since it's not possible to pop - for i := 0; i < len(ww.workers); i++ { - ww.workers[i].State().Set(worker.StateDestroyed) - // kill the worker - _ = ww.workers[i].Kill() - } - return - } - } -} - -// List - this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) List() []worker.BaseProcess { - ww.RLock() - defer ww.RUnlock() - - if len(ww.workers) == 0 { - return nil - } - - base := make([]worker.BaseProcess, 0, len(ww.workers)) - for i := 0; i < len(ww.workers); i++ { - base = append(base, ww.workers[i]) - } - - return base -} - -func (ww *workerWatcher) wait(w worker.BaseProcess) { - const op = errors.Op("worker_watcher_wait") - err := w.Wait() - if err != nil { - ww.events.Push(events.WorkerEvent{ - Event: events.EventWorkerError, - Worker: w, - Payload: errors.E(op, err), - }) - } - - // remove worker - ww.Remove(w) - - if w.State().Value() == worker.StateDestroyed { - // worker was manually destroyed, no need to replace - ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) - return - } - - // set state as stopped - w.State().Set(worker.StateStopped) - - err = ww.Allocate() - if err != nil { - ww.events.Push(events.PoolEvent{ - Event: events.EventPoolError, - Payload: errors.E(op, err), - }) - - // no workers at all, panic - if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { - panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err))) - } - } -} - -func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { - go func() { - ww.wait(wb) - }() -} |