summaryrefslogtreecommitdiff
path: root/worker_watcher/container
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 21:46:50 +0300
committerGitHub <[email protected]>2021-09-16 21:46:50 +0300
commit3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch)
treee723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /worker_watcher/container
parent337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff)
parent823d831b57b75f70c7c3bbbee355f2016633bb3b (diff)
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'worker_watcher/container')
-rw-r--r--worker_watcher/container/channel/vec.go107
-rw-r--r--worker_watcher/container/queue/queue.go102
2 files changed, 209 insertions, 0 deletions
diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go
new file mode 100644
index 00000000..fd50c8d1
--- /dev/null
+++ b/worker_watcher/container/channel/vec.go
@@ -0,0 +1,107 @@
+package channel
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/worker"
+)
+
+type Vec struct {
+ sync.RWMutex
+ // destroy signal
+ destroy uint64
+ // channel with the workers
+ workers chan worker.BaseProcess
+}
+
+func NewVector(len uint64) *Vec {
+ vec := &Vec{
+ destroy: 0,
+ workers: make(chan worker.BaseProcess, len),
+ }
+
+ return vec
+}
+
+// Push is O(1) operation
+// In case of TTL and full channel O(n) worst case, where n is len of the channel
+func (v *Vec) Push(w worker.BaseProcess) {
+ // Non-blocking channel send
+ select {
+ case v.workers <- w:
+ // default select branch is only possible when dealing with TTL
+ // because in that case, workers in the v.workers channel can be TTL-ed and killed
+ // but presenting in the channel
+ default:
+ // Stop Pop operations
+ v.Lock()
+ defer v.Unlock()
+
+ /*
+ we can be in the default branch by the following reasons:
+ 1. TTL is set with no requests during the TTL
+ 2. Violated Get <-> Release operation (how ??)
+ */
+ for i := 0; i < len(v.workers); i++ {
+ /*
+ We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
+ */
+ wrk := <-v.workers
+ switch wrk.State().Value() {
+ // skip good states, put worker back
+ case worker.StateWorking, worker.StateReady:
+ // put the worker back
+ // generally, while send and receive operations are concurrent (from the channel), channel behave
+ // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
+ v.workers <- wrk
+ continue
+ /*
+ Bad states are here.
+ */
+ default:
+ // kill the current worker (just to be sure it's dead)
+ if wrk != nil {
+ _ = wrk.Kill()
+ }
+ // replace with the new one and return from the loop
+ // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
+ // But this case will be handled in the worker_watcher::Get
+ v.workers <- w
+ return
+ }
+ }
+ }
+}
+
+func (v *Vec) Remove(_ int64) {}
+
+func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
+ /*
+ if *addr == old {
+ *addr = new
+ return true
+ }
+ */
+
+ if atomic.LoadUint64(&v.destroy) == 1 {
+ return nil, errors.E(errors.WatcherStopped)
+ }
+
+ // used only for the TTL-ed workers
+ v.RLock()
+ defer v.RUnlock()
+
+ select {
+ case w := <-v.workers:
+ return w, nil
+ case <-ctx.Done():
+ return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
+ }
+}
+
+func (v *Vec) Destroy() {
+ atomic.StoreUint64(&v.destroy, 1)
+}
diff --git a/worker_watcher/container/queue/queue.go b/worker_watcher/container/queue/queue.go
new file mode 100644
index 00000000..a274a836
--- /dev/null
+++ b/worker_watcher/container/queue/queue.go
@@ -0,0 +1,102 @@
+package queue
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/roadrunner/v2/worker"
+)
+
+const (
+ initialSize = 1
+ maxInitialSize = 8
+ maxInternalSliceSize = 10
+)
+
+type Node struct {
+ w []worker.BaseProcess
+ // LL
+ n *Node
+}
+
+type Queue struct {
+ mu sync.Mutex
+
+ head *Node
+ tail *Node
+
+ curr uint64
+ len uint64
+
+ sliceSize uint64
+}
+
+func NewQueue() *Queue {
+ q := &Queue{
+ mu: sync.Mutex{},
+ head: nil,
+ tail: nil,
+ curr: 0,
+ len: 0,
+ sliceSize: 0,
+ }
+
+ return q
+}
+
+func (q *Queue) Push(w worker.BaseProcess) {
+ q.mu.Lock()
+
+ if q.head == nil {
+ h := newNode(initialSize)
+ q.head = h
+ q.tail = h
+ q.sliceSize = maxInitialSize
+ } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) {
+ n := newNode(maxInternalSliceSize)
+ q.tail.n = n
+ q.tail = n
+ q.sliceSize = maxInternalSliceSize
+ }
+
+ q.tail.w = append(q.tail.w, w)
+
+ atomic.AddUint64(&q.len, 1)
+
+ q.mu.Unlock()
+}
+
+func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
+ q.mu.Lock()
+
+ if q.head == nil {
+ return nil, nil
+ }
+
+ w := q.head.w[q.curr]
+ q.head.w[q.curr] = nil
+ atomic.AddUint64(&q.len, ^uint64(0))
+ atomic.AddUint64(&q.curr, 1)
+
+ if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) {
+ n := q.head.n
+ q.head.n = nil
+ q.head = n
+ q.curr = 0
+ }
+
+ q.mu.Unlock()
+
+ return w, nil
+}
+
+func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) {
+
+}
+
+func (q *Queue) Destroy() {}
+
+func newNode(capacity int) *Node {
+ return &Node{w: make([]worker.BaseProcess, 0, capacity)}
+}