summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-18 11:32:44 +0300
committerValery Piashchynski <[email protected]>2021-07-18 11:32:44 +0300
commit9c51360f9119a4114bdcc21c8e61f0908a3c876d (patch)
treeea63a051931bbd8282d64478bbefa2f970fcc955 /pkg
parentf4feb30197843d05eb308081ee579d3a9e3d6206 (diff)
Started beanstalk driver. Add new Queue impl (not finished yet).
Fix bugs in the AMQP, update proto-api Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r--pkg/worker_watcher/container/channel/vec.go (renamed from pkg/worker_watcher/container/vec.go)8
-rw-r--r--pkg/worker_watcher/container/queue/queue.go103
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go59
3 files changed, 141 insertions, 29 deletions
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/channel/vec.go
index 24b5fa6d..eafbfb07 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -1,4 +1,4 @@
-package container
+package channel
import (
"context"
@@ -22,11 +22,13 @@ func NewVector(initialNumOfWorkers uint64) *Vec {
return vec
}
-func (v *Vec) Enqueue(w worker.BaseProcess) {
+func (v *Vec) Push(w worker.BaseProcess) {
v.workers <- w
}
-func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) {
+func (v *Vec) Remove(_ int64) {}
+
+func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
/*
if *addr == old {
*addr = new
diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go
new file mode 100644
index 00000000..a792d7c1
--- /dev/null
+++ b/pkg/worker_watcher/container/queue/queue.go
@@ -0,0 +1,103 @@
+package queue
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+const (
+ initialSize = 1
+ maxInitialSize = 8
+ maxInternalSliceSize = 10
+)
+
+type Node struct {
+ w []worker.BaseProcess
+ // LL
+ n *Node
+}
+
+type Queue struct {
+ mu sync.Mutex
+
+ head *Node
+ tail *Node
+
+ recvCh chan worker.BaseProcess
+ curr uint64
+ len uint64
+
+ sliceSize uint64
+}
+
+func NewQueue() *Queue {
+ q := &Queue{
+ mu: sync.Mutex{},
+ // w/o buffering
+ recvCh: make(chan worker.BaseProcess),
+ head: nil,
+ tail: nil,
+ curr: 0,
+ len: 0,
+ sliceSize: 0,
+ }
+
+ return q
+}
+
+func (q *Queue) Push(w worker.BaseProcess) {
+ q.mu.Lock()
+
+ if q.head == nil {
+ h := newNode(initialSize)
+ q.head = h
+ q.tail = h
+ q.sliceSize = maxInitialSize
+ } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) {
+ n := newNode(maxInternalSliceSize)
+ q.tail.n = n
+ q.tail = n
+ q.sliceSize = maxInternalSliceSize
+ }
+
+ q.tail.w = append(q.tail.w, w)
+
+ atomic.AddUint64(&q.len, 1)
+
+ q.mu.Unlock()
+}
+
+func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
+ q.mu.Lock()
+
+ if q.head == nil {
+ return nil, nil
+ }
+
+ w := q.head.w[q.curr]
+ q.head.w[q.curr] = nil
+ atomic.AddUint64(&q.len, ^uint64(0))
+ atomic.AddUint64(&q.curr, 1)
+
+ if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) {
+ n := q.head.n
+ q.head.n = nil
+ q.head = n
+ q.curr = 0
+ }
+
+ q.mu.Unlock()
+
+ return w, nil
+}
+
+func (q *Queue) Remove(_ int64) {}
+
+func (q *Queue) Destroy() {}
+
+func newNode(capacity int) *Node {
+ return &Node{w: make([]worker.BaseProcess, 0, capacity)}
+}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index b2d61d48..6e343fff 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -8,45 +8,51 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
+ "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
)
// Vector interface represents vector container
type Vector interface {
- // Enqueue used to put worker to the vector
- Enqueue(worker.BaseProcess)
- // Dequeue used to get worker from the vector
- Dequeue(ctx context.Context) (worker.BaseProcess, error)
+ // Push used to put worker to the vector
+ Push(worker.BaseProcess)
+ // Pop used to get worker from the vector
+ Pop(ctx context.Context) (worker.BaseProcess, error)
+ // Remove worker with provided pid
+ Remove(pid int64) // TODO replace
// Destroy used to stop releasing the workers
Destroy()
}
+type workerWatcher struct {
+ sync.RWMutex
+ container Vector
+ // used to control the Destroy stage (that all workers are in the container)
+ numWorkers uint64
+
+ workers []worker.BaseProcess
+
+ allocator worker.Allocator
+ events events.Handler
+}
+
// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
- container: container.NewVector(numWorkers),
+ container: channel.NewVector(numWorkers),
numWorkers: numWorkers,
- workers: make([]worker.BaseProcess, 0, numWorkers),
- allocator: allocator,
- events: events,
+
+ workers: make([]worker.BaseProcess, 0, numWorkers),
+
+ allocator: allocator,
+ events: events,
}
return ww
}
-type workerWatcher struct {
- sync.RWMutex
- container Vector
- // used to control the Destroy stage (that all workers are in the container)
- numWorkers uint64
- workers []worker.BaseProcess
- allocator worker.Allocator
- events events.Handler
-}
-
func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- ww.container.Enqueue(workers[i])
+ ww.container.Push(workers[i])
// add worker to watch slice
ww.workers = append(ww.workers, workers[i])
@@ -62,7 +68,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
- w, err := ww.container.Dequeue(ctx)
+ w, err := ww.container.Pop(ctx)
if errors.Is(errors.WatcherStopped, err) {
return nil, errors.E(op, errors.WatcherStopped)
}
@@ -78,11 +84,11 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
// =========================================================
// SLOW PATH
- _ = w.Kill() // how the worker get here???????
- // no free workers in the container
+ _ = w.Kill()
+ // no free workers in the container or worker not in the ReadyState (TTL-ed)
// try to continuously get free one
for {
- w, err = ww.container.Dequeue(ctx)
+ w, err = ww.container.Pop(ctx)
if errors.Is(errors.WatcherStopped, err) {
return nil, errors.E(op, errors.WatcherStopped)
@@ -98,7 +104,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
case worker.StateReady:
return w, nil
case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
+ ww.container.Push(w) // put it back, let worker finish the work
continue
case
// all the possible wrong states
@@ -162,7 +168,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
func (ww *workerWatcher) Push(w worker.BaseProcess) {
switch w.State().Value() {
case worker.StateReady:
- ww.container.Enqueue(w)
+ ww.container.Push(w)
default:
_ = w.Kill()
}
@@ -232,6 +238,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
return
}
+ w.State().Set(worker.StateStopped)
ww.Remove(w)
err = ww.Allocate()
if err != nil {