diff options
Diffstat (limited to 'pkg/worker_watcher/container')
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 99 | ||||
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 17 | ||||
-rw-r--r-- | pkg/worker_watcher/container/queue/queue.go | 102 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 51 |
4 files changed, 201 insertions, 68 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go new file mode 100644 index 00000000..51093978 --- /dev/null +++ b/pkg/worker_watcher/container/channel/vec.go @@ -0,0 +1,99 @@ +package channel + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Vec struct { + sync.RWMutex + // destroy signal + destroy uint64 + // channel with the workers + workers chan worker.BaseProcess + + len uint64 +} + +func NewVector(len uint64) *Vec { + vec := &Vec{ + destroy: 0, + len: len, + workers: make(chan worker.BaseProcess, len), + } + + return vec +} + +// Push is O(1) operation +// In case of TTL and full channel O(n) worst case, where n is len of the channel +func (v *Vec) Push(w worker.BaseProcess) { + // Non-blocking channel send + select { + case v.workers <- w: + // default select branch is only possible when dealing with TTL + // because in that case, workers in the v.workers channel can be TTL-ed and killed + // but presenting in the channel + default: + v.Lock() + defer v.Unlock() + + /* + we can be in the default branch by the following reasons: + 1. TTL is set with no requests during the TTL + 2. Violated Get <-> Release operation (how ??) + */ + for i := uint64(0); i < v.len; i++ { + wrk := <-v.workers + switch wrk.State().Value() { + // skip good states + case worker.StateWorking, worker.StateReady: + // put the worker back + // generally, while send and receive operations are concurrent (from the channel), channel behave + // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO + v.workers <- wrk + continue + default: + // kill the current worker (just to be sure it's dead) + _ = wrk.Kill() + // replace with the new one + v.workers <- w + return + } + } + } +} + +func (v *Vec) Remove(_ int64) {} + +func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { + /* + if *addr == old { + *addr = new + return true + } + */ + + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + return nil, errors.E(errors.WatcherStopped) + } + + // used only for the TTL-ed workers + v.RLock() + defer v.RUnlock() + + select { + case w := <-v.workers: + return w, nil + case <-ctx.Done(): + return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) + } +} + +func (v *Vec) Destroy() { + atomic.StoreUint64(&v.destroy, 1) +} diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go deleted file mode 100644 index e10ecdae..00000000 --- a/pkg/worker_watcher/container/interface.go +++ /dev/null @@ -1,17 +0,0 @@ -package container - -import ( - "context" - - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -// Vector interface represents vector container -type Vector interface { - // Enqueue used to put worker to the vector - Enqueue(worker.BaseProcess) - // Dequeue used to get worker from the vector - Dequeue(ctx context.Context) (worker.BaseProcess, error) - // Destroy used to stop releasing the workers - Destroy() -} diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go new file mode 100644 index 00000000..edf81d60 --- /dev/null +++ b/pkg/worker_watcher/container/queue/queue.go @@ -0,0 +1,102 @@ +package queue + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +const ( + initialSize = 1 + maxInitialSize = 8 + maxInternalSliceSize = 10 +) + +type Node struct { + w []worker.BaseProcess + // LL + n *Node +} + +type Queue struct { + mu sync.Mutex + + head *Node + tail *Node + + curr uint64 + len uint64 + + sliceSize uint64 +} + +func NewQueue() *Queue { + q := &Queue{ + mu: sync.Mutex{}, + head: nil, + tail: nil, + curr: 0, + len: 0, + sliceSize: 0, + } + + return q +} + +func (q *Queue) Push(w worker.BaseProcess) { + q.mu.Lock() + + if q.head == nil { + h := newNode(initialSize) + q.head = h + q.tail = h + q.sliceSize = maxInitialSize + } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) { + n := newNode(maxInternalSliceSize) + q.tail.n = n + q.tail = n + q.sliceSize = maxInternalSliceSize + } + + q.tail.w = append(q.tail.w, w) + + atomic.AddUint64(&q.len, 1) + + q.mu.Unlock() +} + +func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) { + q.mu.Lock() + + if q.head == nil { + return nil, nil + } + + w := q.head.w[q.curr] + q.head.w[q.curr] = nil + atomic.AddUint64(&q.len, ^uint64(0)) + atomic.AddUint64(&q.curr, 1) + + if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) { + n := q.head.n + q.head.n = nil + q.head = n + q.curr = 0 + } + + q.mu.Unlock() + + return w, nil +} + +func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) { + +} + +func (q *Queue) Destroy() {} + +func newNode(capacity int) *Node { + return &Node{w: make([]worker.BaseProcess, 0, capacity)} +} diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go deleted file mode 100644 index 24b5fa6d..00000000 --- a/pkg/worker_watcher/container/vec.go +++ /dev/null @@ -1,51 +0,0 @@ -package container - -import ( - "context" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -type Vec struct { - destroy uint64 - workers chan worker.BaseProcess -} - -func NewVector(initialNumOfWorkers uint64) *Vec { - vec := &Vec{ - destroy: 0, - workers: make(chan worker.BaseProcess, initialNumOfWorkers), - } - - return vec -} - -func (v *Vec) Enqueue(w worker.BaseProcess) { - v.workers <- w -} - -func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) { - /* - if *addr == old { - *addr = new - return true - } - */ - - if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { - return nil, errors.E(errors.WatcherStopped) - } - - select { - case w := <-v.workers: - return w, nil - case <-ctx.Done(): - return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) - } -} - -func (v *Vec) Destroy() { - atomic.StoreUint64(&v.destroy, 1) -} |