diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 21:46:50 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-16 21:46:50 +0300 |
commit | 3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch) | |
tree | e723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /worker_watcher/container/queue/queue.go | |
parent | 337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff) | |
parent | 823d831b57b75f70c7c3bbbee355f2016633bb3b (diff) |
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'worker_watcher/container/queue/queue.go')
-rw-r--r-- | worker_watcher/container/queue/queue.go | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/worker_watcher/container/queue/queue.go b/worker_watcher/container/queue/queue.go new file mode 100644 index 00000000..a274a836 --- /dev/null +++ b/worker_watcher/container/queue/queue.go @@ -0,0 +1,102 @@ +package queue + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/roadrunner/v2/worker" +) + +const ( + initialSize = 1 + maxInitialSize = 8 + maxInternalSliceSize = 10 +) + +type Node struct { + w []worker.BaseProcess + // LL + n *Node +} + +type Queue struct { + mu sync.Mutex + + head *Node + tail *Node + + curr uint64 + len uint64 + + sliceSize uint64 +} + +func NewQueue() *Queue { + q := &Queue{ + mu: sync.Mutex{}, + head: nil, + tail: nil, + curr: 0, + len: 0, + sliceSize: 0, + } + + return q +} + +func (q *Queue) Push(w worker.BaseProcess) { + q.mu.Lock() + + if q.head == nil { + h := newNode(initialSize) + q.head = h + q.tail = h + q.sliceSize = maxInitialSize + } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) { + n := newNode(maxInternalSliceSize) + q.tail.n = n + q.tail = n + q.sliceSize = maxInternalSliceSize + } + + q.tail.w = append(q.tail.w, w) + + atomic.AddUint64(&q.len, 1) + + q.mu.Unlock() +} + +func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) { + q.mu.Lock() + + if q.head == nil { + return nil, nil + } + + w := q.head.w[q.curr] + q.head.w[q.curr] = nil + atomic.AddUint64(&q.len, ^uint64(0)) + atomic.AddUint64(&q.curr, 1) + + if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) { + n := q.head.n + q.head.n = nil + q.head = n + q.curr = 0 + } + + q.mu.Unlock() + + return w, nil +} + +func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) { + +} + +func (q *Queue) Destroy() {} + +func newNode(capacity int) *Node { + return &Node{w: make([]worker.BaseProcess, 0, capacity)} +} |