diff options
Diffstat (limited to 'pkg/worker_watcher/container')
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 107 | ||||
-rw-r--r-- | pkg/worker_watcher/container/queue/queue.go | 102 |
2 files changed, 0 insertions, 209 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go deleted file mode 100644 index 5605f1e0..00000000 --- a/pkg/worker_watcher/container/channel/vec.go +++ /dev/null @@ -1,107 +0,0 @@ -package channel - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -type Vec struct { - sync.RWMutex - // destroy signal - destroy uint64 - // channel with the workers - workers chan worker.BaseProcess -} - -func NewVector(len uint64) *Vec { - vec := &Vec{ - destroy: 0, - workers: make(chan worker.BaseProcess, len), - } - - return vec -} - -// Push is O(1) operation -// In case of TTL and full channel O(n) worst case, where n is len of the channel -func (v *Vec) Push(w worker.BaseProcess) { - // Non-blocking channel send - select { - case v.workers <- w: - // default select branch is only possible when dealing with TTL - // because in that case, workers in the v.workers channel can be TTL-ed and killed - // but presenting in the channel - default: - // Stop Pop operations - v.Lock() - defer v.Unlock() - - /* - we can be in the default branch by the following reasons: - 1. TTL is set with no requests during the TTL - 2. Violated Get <-> Release operation (how ??) - */ - for i := 0; i < len(v.workers); i++ { - /* - We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. - */ - wrk := <-v.workers - switch wrk.State().Value() { - // skip good states, put worker back - case worker.StateWorking, worker.StateReady: - // put the worker back - // generally, while send and receive operations are concurrent (from the channel), channel behave - // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO - v.workers <- wrk - continue - /* - Bad states are here. - */ - default: - // kill the current worker (just to be sure it's dead) - if wrk != nil { - _ = wrk.Kill() - } - // replace with the new one and return from the loop - // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker - // But this case will be handled in the worker_watcher::Get - v.workers <- w - return - } - } - } -} - -func (v *Vec) Remove(_ int64) {} - -func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { - /* - if *addr == old { - *addr = new - return true - } - */ - - if atomic.LoadUint64(&v.destroy) == 1 { - return nil, errors.E(errors.WatcherStopped) - } - - // used only for the TTL-ed workers - v.RLock() - defer v.RUnlock() - - select { - case w := <-v.workers: - return w, nil - case <-ctx.Done(): - return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) - } -} - -func (v *Vec) Destroy() { - atomic.StoreUint64(&v.destroy, 1) -} diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go deleted file mode 100644 index edf81d60..00000000 --- a/pkg/worker_watcher/container/queue/queue.go +++ /dev/null @@ -1,102 +0,0 @@ -package queue - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -const ( - initialSize = 1 - maxInitialSize = 8 - maxInternalSliceSize = 10 -) - -type Node struct { - w []worker.BaseProcess - // LL - n *Node -} - -type Queue struct { - mu sync.Mutex - - head *Node - tail *Node - - curr uint64 - len uint64 - - sliceSize uint64 -} - -func NewQueue() *Queue { - q := &Queue{ - mu: sync.Mutex{}, - head: nil, - tail: nil, - curr: 0, - len: 0, - sliceSize: 0, - } - - return q -} - -func (q *Queue) Push(w worker.BaseProcess) { - q.mu.Lock() - - if q.head == nil { - h := newNode(initialSize) - q.head = h - q.tail = h - q.sliceSize = maxInitialSize - } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) { - n := newNode(maxInternalSliceSize) - q.tail.n = n - q.tail = n - q.sliceSize = maxInternalSliceSize - } - - q.tail.w = append(q.tail.w, w) - - atomic.AddUint64(&q.len, 1) - - q.mu.Unlock() -} - -func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) { - q.mu.Lock() - - if q.head == nil { - return nil, nil - } - - w := q.head.w[q.curr] - q.head.w[q.curr] = nil - atomic.AddUint64(&q.len, ^uint64(0)) - atomic.AddUint64(&q.curr, 1) - - if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) { - n := q.head.n - q.head.n = nil - q.head = n - q.curr = 0 - } - - q.mu.Unlock() - - return w, nil -} - -func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) { - -} - -func (q *Queue) Destroy() {} - -func newNode(capacity int) *Node { - return &Node{w: make([]worker.BaseProcess, 0, capacity)} -} |