diff options
Diffstat (limited to 'worker_watcher')
-rw-r--r-- | worker_watcher/container/channel/vec.go | 107 | ||||
-rw-r--r-- | worker_watcher/container/queue/queue.go | 102 | ||||
-rwxr-xr-x | worker_watcher/worker_watcher.go | 318 |
3 files changed, 527 insertions, 0 deletions
diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go new file mode 100644 index 00000000..fd50c8d1 --- /dev/null +++ b/worker_watcher/container/channel/vec.go @@ -0,0 +1,107 @@ +package channel + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/worker" +) + +type Vec struct { + sync.RWMutex + // destroy signal + destroy uint64 + // channel with the workers + workers chan worker.BaseProcess +} + +func NewVector(len uint64) *Vec { + vec := &Vec{ + destroy: 0, + workers: make(chan worker.BaseProcess, len), + } + + return vec +} + +// Push is O(1) operation +// In case of TTL and full channel O(n) worst case, where n is len of the channel +func (v *Vec) Push(w worker.BaseProcess) { + // Non-blocking channel send + select { + case v.workers <- w: + // default select branch is only possible when dealing with TTL + // because in that case, workers in the v.workers channel can be TTL-ed and killed + // but presenting in the channel + default: + // Stop Pop operations + v.Lock() + defer v.Unlock() + + /* + we can be in the default branch by the following reasons: + 1. TTL is set with no requests during the TTL + 2. Violated Get <-> Release operation (how ??) + */ + for i := 0; i < len(v.workers); i++ { + /* + We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. + */ + wrk := <-v.workers + switch wrk.State().Value() { + // skip good states, put worker back + case worker.StateWorking, worker.StateReady: + // put the worker back + // generally, while send and receive operations are concurrent (from the channel), channel behave + // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO + v.workers <- wrk + continue + /* + Bad states are here. + */ + default: + // kill the current worker (just to be sure it's dead) + if wrk != nil { + _ = wrk.Kill() + } + // replace with the new one and return from the loop + // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker + // But this case will be handled in the worker_watcher::Get + v.workers <- w + return + } + } + } +} + +func (v *Vec) Remove(_ int64) {} + +func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { + /* + if *addr == old { + *addr = new + return true + } + */ + + if atomic.LoadUint64(&v.destroy) == 1 { + return nil, errors.E(errors.WatcherStopped) + } + + // used only for the TTL-ed workers + v.RLock() + defer v.RUnlock() + + select { + case w := <-v.workers: + return w, nil + case <-ctx.Done(): + return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) + } +} + +func (v *Vec) Destroy() { + atomic.StoreUint64(&v.destroy, 1) +} diff --git a/worker_watcher/container/queue/queue.go b/worker_watcher/container/queue/queue.go new file mode 100644 index 00000000..a274a836 --- /dev/null +++ b/worker_watcher/container/queue/queue.go @@ -0,0 +1,102 @@ +package queue + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/roadrunner/v2/worker" +) + +const ( + initialSize = 1 + maxInitialSize = 8 + maxInternalSliceSize = 10 +) + +type Node struct { + w []worker.BaseProcess + // LL + n *Node +} + +type Queue struct { + mu sync.Mutex + + head *Node + tail *Node + + curr uint64 + len uint64 + + sliceSize uint64 +} + +func NewQueue() *Queue { + q := &Queue{ + mu: sync.Mutex{}, + head: nil, + tail: nil, + curr: 0, + len: 0, + sliceSize: 0, + } + + return q +} + +func (q *Queue) Push(w worker.BaseProcess) { + q.mu.Lock() + + if q.head == nil { + h := newNode(initialSize) + q.head = h + q.tail = h + q.sliceSize = maxInitialSize + } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) { + n := newNode(maxInternalSliceSize) + q.tail.n = n + q.tail = n + q.sliceSize = maxInternalSliceSize + } + + q.tail.w = append(q.tail.w, w) + + atomic.AddUint64(&q.len, 1) + + q.mu.Unlock() +} + +func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) { + q.mu.Lock() + + if q.head == nil { + return nil, nil + } + + w := q.head.w[q.curr] + q.head.w[q.curr] = nil + atomic.AddUint64(&q.len, ^uint64(0)) + atomic.AddUint64(&q.curr, 1) + + if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) { + n := q.head.n + q.head.n = nil + q.head = n + q.curr = 0 + } + + q.mu.Unlock() + + return w, nil +} + +func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) { + +} + +func (q *Queue) Destroy() {} + +func newNode(capacity int) *Node { + return &Node{w: make([]worker.BaseProcess, 0, capacity)} +} diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go new file mode 100755 index 00000000..78bae778 --- /dev/null +++ b/worker_watcher/worker_watcher.go @@ -0,0 +1,318 @@ +package worker_watcher //nolint:stylecheck + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/utils" + "github.com/spiral/roadrunner/v2/worker" + "github.com/spiral/roadrunner/v2/worker_watcher/container/channel" +) + +// Vector interface represents vector container +type Vector interface { + // Push used to put worker to the vector + Push(worker.BaseProcess) + // Pop used to get worker from the vector + Pop(ctx context.Context) (worker.BaseProcess, error) + // Remove worker with provided pid + Remove(pid int64) + // Destroy used to stop releasing the workers + Destroy() + + // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation + // Replace(prevPid int64, newWorker worker.BaseProcess) +} + +type workerWatcher struct { + sync.RWMutex + container Vector + // used to control Destroy stage (that all workers are in the container) + numWorkers *uint64 + + workers []worker.BaseProcess + + allocator worker.Allocator + allocateTimeout time.Duration + events events.Handler +} + +// NewSyncWorkerWatcher is a constructor for the Watcher +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { + ww := &workerWatcher{ + container: channel.NewVector(numWorkers), + + // pass a ptr to the number of workers to avoid blocking in the TTL loop + numWorkers: utils.Uint64(numWorkers), + allocateTimeout: allocateTimeout, + workers: make([]worker.BaseProcess, 0, numWorkers), + + allocator: allocator, + events: events, + } + + return ww +} + +func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { + for i := 0; i < len(workers); i++ { + ww.container.Push(workers[i]) + // add worker to watch slice + ww.workers = append(ww.workers, workers[i]) + + go func(swc worker.BaseProcess) { + ww.wait(swc) + }(workers[i]) + } + return nil +} + +// Take is not a thread safe operation +func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { + const op = errors.Op("worker_watcher_get_free_worker") + + // thread safe operation + w, err := ww.container.Pop(ctx) + if err != nil { + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) + } + + return nil, errors.E(op, err) + } + + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + return w, nil + } + + // ========================================================= + // SLOW PATH + _ = w.Kill() + // no free workers in the container or worker not in the ReadyState (TTL-ed) + // try to continuously get free one + for { + w, err = ww.container.Pop(ctx) + if err != nil { + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) + } + return nil, errors.E(op, err) + } + + if err != nil { + return nil, errors.E(op, err) + } + + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + return w, nil + case worker.StateWorking: // how?? + ww.container.Push(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue + } + } +} + +func (ww *workerWatcher) Allocate() error { + const op = errors.Op("worker_watcher_allocate_new") + + sw, err := ww.allocator() + if err != nil { + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), + }) + + // if no timeout, return error immediately + if ww.allocateTimeout == 0 { + return errors.E(op, errors.WorkerAllocate, err) + } + + // every half of a second + allocateFreq := time.NewTicker(time.Millisecond * 500) + + tt := time.After(ww.allocateTimeout) + for { + select { + case <-tt: + // reduce number of workers + atomic.AddUint64(ww.numWorkers, ^uint64(0)) + allocateFreq.Stop() + // timeout exceed, worker can't be allocated + return errors.E(op, errors.WorkerAllocate, err) + + case <-allocateFreq.C: + sw, err = ww.allocator() + if err != nil { + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), + }) + continue + } + + // reallocated + allocateFreq.Stop() + goto done + } + } + } + +done: + // add worker to Wait + ww.addToWatch(sw) + + ww.Lock() + // add new worker to the workers slice (to get information about workers in parallel) + ww.workers = append(ww.workers, sw) + ww.Unlock() + + // push the worker to the container + ww.Release(sw) + return nil +} + +// Remove worker +func (ww *workerWatcher) Remove(wb worker.BaseProcess) { + ww.Lock() + defer ww.Unlock() + + // set remove state + pid := wb.Pid() + + // worker will be removed on the Get operation + for i := 0; i < len(ww.workers); i++ { + if ww.workers[i].Pid() == pid { + ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) + // kill worker, just to be sure it's dead + _ = wb.Kill() + return + } + } +} + +// Release O(1) operation +func (ww *workerWatcher) Release(w worker.BaseProcess) { + switch w.State().Value() { + case worker.StateReady: + ww.container.Push(w) + default: + _ = w.Kill() + } +} + +// Destroy all underlying container (but let them complete the task) +func (ww *workerWatcher) Destroy(_ context.Context) { + // destroy container, we don't use ww mutex here, since we should be able to push worker + ww.Lock() + // do not release new workers + ww.container.Destroy() + ww.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + defer tt.Stop() + for { //nolint:gosimple + select { + case <-tt.C: + ww.Lock() + // that might be one of the workers is working + if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { + ww.Unlock() + continue + } + // All container at this moment are in the container + // Pop operation is blocked, push can't be done, since it's not possible to pop + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + return + } + } +} + +// List - this is O(n) operation, and it will return copy of the actual workers +func (ww *workerWatcher) List() []worker.BaseProcess { + ww.RLock() + defer ww.RUnlock() + + if len(ww.workers) == 0 { + return nil + } + + base := make([]worker.BaseProcess, 0, len(ww.workers)) + for i := 0; i < len(ww.workers); i++ { + base = append(base, ww.workers[i]) + } + + return base +} + +func (ww *workerWatcher) wait(w worker.BaseProcess) { + const op = errors.Op("worker_watcher_wait") + err := w.Wait() + if err != nil { + ww.events.Push(events.WorkerEvent{ + Event: events.EventWorkerError, + Worker: w, + Payload: errors.E(op, err), + }) + } + + // remove worker + ww.Remove(w) + + if w.State().Value() == worker.StateDestroyed { + // worker was manually destroyed, no need to replace + ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + return + } + + // set state as stopped + w.State().Set(worker.StateStopped) + + err = ww.Allocate() + if err != nil { + ww.events.Push(events.PoolEvent{ + Event: events.EventWorkerProcessExit, + Error: errors.E(op, err), + }) + + // no workers at all, panic + if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { + panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err))) + } + } +} + +func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { + go func() { + ww.wait(wb) + }() +} |