summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/container
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/container')
-rw-r--r--pkg/worker_watcher/container/channel/vec.go107
-rw-r--r--pkg/worker_watcher/container/queue/queue.go102
2 files changed, 0 insertions, 209 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
deleted file mode 100644
index 5605f1e0..00000000
--- a/pkg/worker_watcher/container/channel/vec.go
+++ /dev/null
@@ -1,107 +0,0 @@
-package channel
-
-import (
- "context"
- "sync"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-type Vec struct {
- sync.RWMutex
- // destroy signal
- destroy uint64
- // channel with the workers
- workers chan worker.BaseProcess
-}
-
-func NewVector(len uint64) *Vec {
- vec := &Vec{
- destroy: 0,
- workers: make(chan worker.BaseProcess, len),
- }
-
- return vec
-}
-
-// Push is O(1) operation
-// In case of TTL and full channel O(n) worst case, where n is len of the channel
-func (v *Vec) Push(w worker.BaseProcess) {
- // Non-blocking channel send
- select {
- case v.workers <- w:
- // default select branch is only possible when dealing with TTL
- // because in that case, workers in the v.workers channel can be TTL-ed and killed
- // but presenting in the channel
- default:
- // Stop Pop operations
- v.Lock()
- defer v.Unlock()
-
- /*
- we can be in the default branch by the following reasons:
- 1. TTL is set with no requests during the TTL
- 2. Violated Get <-> Release operation (how ??)
- */
- for i := 0; i < len(v.workers); i++ {
- /*
- We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
- */
- wrk := <-v.workers
- switch wrk.State().Value() {
- // skip good states, put worker back
- case worker.StateWorking, worker.StateReady:
- // put the worker back
- // generally, while send and receive operations are concurrent (from the channel), channel behave
- // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
- v.workers <- wrk
- continue
- /*
- Bad states are here.
- */
- default:
- // kill the current worker (just to be sure it's dead)
- if wrk != nil {
- _ = wrk.Kill()
- }
- // replace with the new one and return from the loop
- // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
- // But this case will be handled in the worker_watcher::Get
- v.workers <- w
- return
- }
- }
- }
-}
-
-func (v *Vec) Remove(_ int64) {}
-
-func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
- /*
- if *addr == old {
- *addr = new
- return true
- }
- */
-
- if atomic.LoadUint64(&v.destroy) == 1 {
- return nil, errors.E(errors.WatcherStopped)
- }
-
- // used only for the TTL-ed workers
- v.RLock()
- defer v.RUnlock()
-
- select {
- case w := <-v.workers:
- return w, nil
- case <-ctx.Done():
- return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
- }
-}
-
-func (v *Vec) Destroy() {
- atomic.StoreUint64(&v.destroy, 1)
-}
diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go
deleted file mode 100644
index edf81d60..00000000
--- a/pkg/worker_watcher/container/queue/queue.go
+++ /dev/null
@@ -1,102 +0,0 @@
-package queue
-
-import (
- "context"
- "sync"
- "sync/atomic"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-const (
- initialSize = 1
- maxInitialSize = 8
- maxInternalSliceSize = 10
-)
-
-type Node struct {
- w []worker.BaseProcess
- // LL
- n *Node
-}
-
-type Queue struct {
- mu sync.Mutex
-
- head *Node
- tail *Node
-
- curr uint64
- len uint64
-
- sliceSize uint64
-}
-
-func NewQueue() *Queue {
- q := &Queue{
- mu: sync.Mutex{},
- head: nil,
- tail: nil,
- curr: 0,
- len: 0,
- sliceSize: 0,
- }
-
- return q
-}
-
-func (q *Queue) Push(w worker.BaseProcess) {
- q.mu.Lock()
-
- if q.head == nil {
- h := newNode(initialSize)
- q.head = h
- q.tail = h
- q.sliceSize = maxInitialSize
- } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) {
- n := newNode(maxInternalSliceSize)
- q.tail.n = n
- q.tail = n
- q.sliceSize = maxInternalSliceSize
- }
-
- q.tail.w = append(q.tail.w, w)
-
- atomic.AddUint64(&q.len, 1)
-
- q.mu.Unlock()
-}
-
-func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
- q.mu.Lock()
-
- if q.head == nil {
- return nil, nil
- }
-
- w := q.head.w[q.curr]
- q.head.w[q.curr] = nil
- atomic.AddUint64(&q.len, ^uint64(0))
- atomic.AddUint64(&q.curr, 1)
-
- if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) {
- n := q.head.n
- q.head.n = nil
- q.head = n
- q.curr = 0
- }
-
- q.mu.Unlock()
-
- return w, nil
-}
-
-func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) {
-
-}
-
-func (q *Queue) Destroy() {}
-
-func newNode(capacity int) *Node {
- return &Node{w: make([]worker.BaseProcess, 0, capacity)}
-}