summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r--pkg/worker_watcher/container/channel/vec.go99
-rw-r--r--pkg/worker_watcher/container/interface.go17
-rw-r--r--pkg/worker_watcher/container/queue/queue.go102
-rw-r--r--pkg/worker_watcher/container/vec.go51
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go78
5 files changed, 247 insertions, 100 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
new file mode 100644
index 00000000..51093978
--- /dev/null
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -0,0 +1,99 @@
+package channel
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Vec struct {
+ sync.RWMutex
+ // destroy signal
+ destroy uint64
+ // channel with the workers
+ workers chan worker.BaseProcess
+
+ len uint64
+}
+
+func NewVector(len uint64) *Vec {
+ vec := &Vec{
+ destroy: 0,
+ len: len,
+ workers: make(chan worker.BaseProcess, len),
+ }
+
+ return vec
+}
+
+// Push is O(1) operation
+// In case of TTL and full channel O(n) worst case, where n is len of the channel
+func (v *Vec) Push(w worker.BaseProcess) {
+ // Non-blocking channel send
+ select {
+ case v.workers <- w:
+ // default select branch is only possible when dealing with TTL
+ // because in that case, workers in the v.workers channel can be TTL-ed and killed
+ // but presenting in the channel
+ default:
+ v.Lock()
+ defer v.Unlock()
+
+ /*
+ we can be in the default branch by the following reasons:
+ 1. TTL is set with no requests during the TTL
+ 2. Violated Get <-> Release operation (how ??)
+ */
+ for i := uint64(0); i < v.len; i++ {
+ wrk := <-v.workers
+ switch wrk.State().Value() {
+ // skip good states
+ case worker.StateWorking, worker.StateReady:
+ // put the worker back
+ // generally, while send and receive operations are concurrent (from the channel), channel behave
+ // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
+ v.workers <- wrk
+ continue
+ default:
+ // kill the current worker (just to be sure it's dead)
+ _ = wrk.Kill()
+ // replace with the new one
+ v.workers <- w
+ return
+ }
+ }
+ }
+}
+
+func (v *Vec) Remove(_ int64) {}
+
+func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
+ /*
+ if *addr == old {
+ *addr = new
+ return true
+ }
+ */
+
+ if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ return nil, errors.E(errors.WatcherStopped)
+ }
+
+ // used only for the TTL-ed workers
+ v.RLock()
+ defer v.RUnlock()
+
+ select {
+ case w := <-v.workers:
+ return w, nil
+ case <-ctx.Done():
+ return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
+ }
+}
+
+func (v *Vec) Destroy() {
+ atomic.StoreUint64(&v.destroy, 1)
+}
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go
deleted file mode 100644
index e10ecdae..00000000
--- a/pkg/worker_watcher/container/interface.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package container
-
-import (
- "context"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-// Vector interface represents vector container
-type Vector interface {
- // Enqueue used to put worker to the vector
- Enqueue(worker.BaseProcess)
- // Dequeue used to get worker from the vector
- Dequeue(ctx context.Context) (worker.BaseProcess, error)
- // Destroy used to stop releasing the workers
- Destroy()
-}
diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go
new file mode 100644
index 00000000..edf81d60
--- /dev/null
+++ b/pkg/worker_watcher/container/queue/queue.go
@@ -0,0 +1,102 @@
+package queue
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+const (
+ initialSize = 1
+ maxInitialSize = 8
+ maxInternalSliceSize = 10
+)
+
+type Node struct {
+ w []worker.BaseProcess
+ // LL
+ n *Node
+}
+
+type Queue struct {
+ mu sync.Mutex
+
+ head *Node
+ tail *Node
+
+ curr uint64
+ len uint64
+
+ sliceSize uint64
+}
+
+func NewQueue() *Queue {
+ q := &Queue{
+ mu: sync.Mutex{},
+ head: nil,
+ tail: nil,
+ curr: 0,
+ len: 0,
+ sliceSize: 0,
+ }
+
+ return q
+}
+
+func (q *Queue) Push(w worker.BaseProcess) {
+ q.mu.Lock()
+
+ if q.head == nil {
+ h := newNode(initialSize)
+ q.head = h
+ q.tail = h
+ q.sliceSize = maxInitialSize
+ } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) {
+ n := newNode(maxInternalSliceSize)
+ q.tail.n = n
+ q.tail = n
+ q.sliceSize = maxInternalSliceSize
+ }
+
+ q.tail.w = append(q.tail.w, w)
+
+ atomic.AddUint64(&q.len, 1)
+
+ q.mu.Unlock()
+}
+
+func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
+ q.mu.Lock()
+
+ if q.head == nil {
+ return nil, nil
+ }
+
+ w := q.head.w[q.curr]
+ q.head.w[q.curr] = nil
+ atomic.AddUint64(&q.len, ^uint64(0))
+ atomic.AddUint64(&q.curr, 1)
+
+ if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) {
+ n := q.head.n
+ q.head.n = nil
+ q.head = n
+ q.curr = 0
+ }
+
+ q.mu.Unlock()
+
+ return w, nil
+}
+
+func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) {
+
+}
+
+func (q *Queue) Destroy() {}
+
+func newNode(capacity int) *Node {
+ return &Node{w: make([]worker.BaseProcess, 0, capacity)}
+}
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
deleted file mode 100644
index 24b5fa6d..00000000
--- a/pkg/worker_watcher/container/vec.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package container
-
-import (
- "context"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-type Vec struct {
- destroy uint64
- workers chan worker.BaseProcess
-}
-
-func NewVector(initialNumOfWorkers uint64) *Vec {
- vec := &Vec{
- destroy: 0,
- workers: make(chan worker.BaseProcess, initialNumOfWorkers),
- }
-
- return vec
-}
-
-func (v *Vec) Enqueue(w worker.BaseProcess) {
- v.workers <- w
-}
-
-func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) {
- /*
- if *addr == old {
- *addr = new
- return true
- }
- */
-
- if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
- return nil, errors.E(errors.WatcherStopped)
- }
-
- select {
- case w := <-v.workers:
- return w, nil
- case <-ctx.Done():
- return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
- }
-}
-
-func (v *Vec) Destroy() {
- atomic.StoreUint64(&v.destroy, 1)
-}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index b2d61d48..348be199 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -8,45 +8,54 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
+ "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
)
// Vector interface represents vector container
type Vector interface {
- // Enqueue used to put worker to the vector
- Enqueue(worker.BaseProcess)
- // Dequeue used to get worker from the vector
- Dequeue(ctx context.Context) (worker.BaseProcess, error)
+ // Push used to put worker to the vector
+ Push(worker.BaseProcess)
+ // Pop used to get worker from the vector
+ Pop(ctx context.Context) (worker.BaseProcess, error)
+ // Remove worker with provided pid
+ Remove(pid int64)
// Destroy used to stop releasing the workers
Destroy()
+
+ // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation
+ // Replace(prevPid int64, newWorker worker.BaseProcess)
+}
+
+type workerWatcher struct {
+ sync.RWMutex
+ container Vector
+ // used to control Destroy stage (that all workers are in the container)
+ numWorkers uint64
+
+ workers []worker.BaseProcess
+
+ allocator worker.Allocator
+ events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
- container: container.NewVector(numWorkers),
+ container: channel.NewVector(numWorkers),
numWorkers: numWorkers,
- workers: make([]worker.BaseProcess, 0, numWorkers),
- allocator: allocator,
- events: events,
+
+ workers: make([]worker.BaseProcess, 0, numWorkers),
+
+ allocator: allocator,
+ events: events,
}
return ww
}
-type workerWatcher struct {
- sync.RWMutex
- container Vector
- // used to control the Destroy stage (that all workers are in the container)
- numWorkers uint64
- workers []worker.BaseProcess
- allocator worker.Allocator
- events events.Handler
-}
-
func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- ww.container.Enqueue(workers[i])
+ ww.container.Push(workers[i])
// add worker to watch slice
ww.workers = append(ww.workers, workers[i])
@@ -57,12 +66,12 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
return nil
}
-// Get is not a thread safe operation
-func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
+// Take is not a thread safe operation
+func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
- w, err := ww.container.Dequeue(ctx)
+ w, err := ww.container.Pop(ctx)
if errors.Is(errors.WatcherStopped, err) {
return nil, errors.E(op, errors.WatcherStopped)
}
@@ -78,11 +87,11 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
// =========================================================
// SLOW PATH
- _ = w.Kill() // how the worker get here???????
- // no free workers in the container
+ _ = w.Kill()
+ // no free workers in the container or worker not in the ReadyState (TTL-ed)
// try to continuously get free one
for {
- w, err = ww.container.Dequeue(ctx)
+ w, err = ww.container.Pop(ctx)
if errors.Is(errors.WatcherStopped, err) {
return nil, errors.E(op, errors.WatcherStopped)
@@ -98,7 +107,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
case worker.StateReady:
return w, nil
case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
+ ww.container.Push(w) // put it back, let worker finish the work
continue
case
// all the possible wrong states
@@ -135,7 +144,7 @@ func (ww *workerWatcher) Allocate() error {
// unlock Allocate mutex
ww.Unlock()
// push the worker to the container
- ww.Push(sw)
+ ww.Release(sw)
return nil
}
@@ -158,11 +167,11 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
}
}
-// Push O(1) operation
-func (ww *workerWatcher) Push(w worker.BaseProcess) {
+// Release O(1) operation
+func (ww *workerWatcher) Release(w worker.BaseProcess) {
switch w.State().Value() {
case worker.StateReady:
- ww.container.Enqueue(w)
+ ww.container.Push(w)
default:
_ = w.Kill()
}
@@ -226,13 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
+ // remove worker
+ ww.Remove(w)
+
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}
- ww.Remove(w)
+ // set state as stopped
+ w.State().Set(worker.StateStopped)
+
err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{