diff options
author | Valery Piashchynski <[email protected]> | 2021-07-18 11:32:44 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-18 11:32:44 +0300 |
commit | 9c51360f9119a4114bdcc21c8e61f0908a3c876d (patch) | |
tree | ea63a051931bbd8282d64478bbefa2f970fcc955 /pkg/worker_watcher | |
parent | f4feb30197843d05eb308081ee579d3a9e3d6206 (diff) |
Started beanstalk driver. Add new Queue impl (not finished yet).
Fix bugs in the AMQP, update proto-api
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go (renamed from pkg/worker_watcher/container/vec.go) | 8 | ||||
-rw-r--r-- | pkg/worker_watcher/container/queue/queue.go | 103 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 59 |
3 files changed, 141 insertions, 29 deletions
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/channel/vec.go index 24b5fa6d..eafbfb07 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -1,4 +1,4 @@ -package container +package channel import ( "context" @@ -22,11 +22,13 @@ func NewVector(initialNumOfWorkers uint64) *Vec { return vec } -func (v *Vec) Enqueue(w worker.BaseProcess) { +func (v *Vec) Push(w worker.BaseProcess) { v.workers <- w } -func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) { +func (v *Vec) Remove(_ int64) {} + +func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { /* if *addr == old { *addr = new diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go new file mode 100644 index 00000000..a792d7c1 --- /dev/null +++ b/pkg/worker_watcher/container/queue/queue.go @@ -0,0 +1,103 @@ +package queue + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +const ( + initialSize = 1 + maxInitialSize = 8 + maxInternalSliceSize = 10 +) + +type Node struct { + w []worker.BaseProcess + // LL + n *Node +} + +type Queue struct { + mu sync.Mutex + + head *Node + tail *Node + + recvCh chan worker.BaseProcess + curr uint64 + len uint64 + + sliceSize uint64 +} + +func NewQueue() *Queue { + q := &Queue{ + mu: sync.Mutex{}, + // w/o buffering + recvCh: make(chan worker.BaseProcess), + head: nil, + tail: nil, + curr: 0, + len: 0, + sliceSize: 0, + } + + return q +} + +func (q *Queue) Push(w worker.BaseProcess) { + q.mu.Lock() + + if q.head == nil { + h := newNode(initialSize) + q.head = h + q.tail = h + q.sliceSize = maxInitialSize + } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) { + n := newNode(maxInternalSliceSize) + q.tail.n = n + q.tail = n + q.sliceSize = maxInternalSliceSize + } + + q.tail.w = append(q.tail.w, w) + + atomic.AddUint64(&q.len, 1) + + q.mu.Unlock() +} + +func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) { + q.mu.Lock() + + if q.head == nil { + return nil, nil + } + + w := q.head.w[q.curr] + q.head.w[q.curr] = nil + atomic.AddUint64(&q.len, ^uint64(0)) + atomic.AddUint64(&q.curr, 1) + + if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) { + n := q.head.n + q.head.n = nil + q.head = n + q.curr = 0 + } + + q.mu.Unlock() + + return w, nil +} + +func (q *Queue) Remove(_ int64) {} + +func (q *Queue) Destroy() {} + +func newNode(capacity int) *Node { + return &Node{w: make([]worker.BaseProcess, 0, capacity)} +} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index b2d61d48..6e343fff 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -8,45 +8,51 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" + "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" ) // Vector interface represents vector container type Vector interface { - // Enqueue used to put worker to the vector - Enqueue(worker.BaseProcess) - // Dequeue used to get worker from the vector - Dequeue(ctx context.Context) (worker.BaseProcess, error) + // Push used to put worker to the vector + Push(worker.BaseProcess) + // Pop used to get worker from the vector + Pop(ctx context.Context) (worker.BaseProcess, error) + // Remove worker with provided pid + Remove(pid int64) // TODO replace // Destroy used to stop releasing the workers Destroy() } +type workerWatcher struct { + sync.RWMutex + container Vector + // used to control the Destroy stage (that all workers are in the container) + numWorkers uint64 + + workers []worker.BaseProcess + + allocator worker.Allocator + events events.Handler +} + // NewSyncWorkerWatcher is a constructor for the Watcher func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { ww := &workerWatcher{ - container: container.NewVector(numWorkers), + container: channel.NewVector(numWorkers), numWorkers: numWorkers, - workers: make([]worker.BaseProcess, 0, numWorkers), - allocator: allocator, - events: events, + + workers: make([]worker.BaseProcess, 0, numWorkers), + + allocator: allocator, + events: events, } return ww } -type workerWatcher struct { - sync.RWMutex - container Vector - // used to control the Destroy stage (that all workers are in the container) - numWorkers uint64 - workers []worker.BaseProcess - allocator worker.Allocator - events events.Handler -} - func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - ww.container.Enqueue(workers[i]) + ww.container.Push(workers[i]) // add worker to watch slice ww.workers = append(ww.workers, workers[i]) @@ -62,7 +68,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation - w, err := ww.container.Dequeue(ctx) + w, err := ww.container.Pop(ctx) if errors.Is(errors.WatcherStopped, err) { return nil, errors.E(op, errors.WatcherStopped) } @@ -78,11 +84,11 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { // ========================================================= // SLOW PATH - _ = w.Kill() // how the worker get here??????? - // no free workers in the container + _ = w.Kill() + // no free workers in the container or worker not in the ReadyState (TTL-ed) // try to continuously get free one for { - w, err = ww.container.Dequeue(ctx) + w, err = ww.container.Pop(ctx) if errors.Is(errors.WatcherStopped, err) { return nil, errors.E(op, errors.WatcherStopped) @@ -98,7 +104,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { case worker.StateReady: return w, nil case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work + ww.container.Push(w) // put it back, let worker finish the work continue case // all the possible wrong states @@ -162,7 +168,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { func (ww *workerWatcher) Push(w worker.BaseProcess) { switch w.State().Value() { case worker.StateReady: - ww.container.Enqueue(w) + ww.container.Push(w) default: _ = w.Kill() } @@ -232,6 +238,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { return } + w.State().Set(worker.StateStopped) ww.Remove(w) err = ww.Allocate() if err != nil { |