summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go78
1 files changed, 46 insertions, 32 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index b2d61d48..348be199 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -8,45 +8,54 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
+ "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
)
// Vector interface represents vector container
type Vector interface {
- // Enqueue used to put worker to the vector
- Enqueue(worker.BaseProcess)
- // Dequeue used to get worker from the vector
- Dequeue(ctx context.Context) (worker.BaseProcess, error)
+ // Push used to put worker to the vector
+ Push(worker.BaseProcess)
+ // Pop used to get worker from the vector
+ Pop(ctx context.Context) (worker.BaseProcess, error)
+ // Remove worker with provided pid
+ Remove(pid int64)
// Destroy used to stop releasing the workers
Destroy()
+
+ // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation
+ // Replace(prevPid int64, newWorker worker.BaseProcess)
+}
+
+type workerWatcher struct {
+ sync.RWMutex
+ container Vector
+ // used to control Destroy stage (that all workers are in the container)
+ numWorkers uint64
+
+ workers []worker.BaseProcess
+
+ allocator worker.Allocator
+ events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
- container: container.NewVector(numWorkers),
+ container: channel.NewVector(numWorkers),
numWorkers: numWorkers,
- workers: make([]worker.BaseProcess, 0, numWorkers),
- allocator: allocator,
- events: events,
+
+ workers: make([]worker.BaseProcess, 0, numWorkers),
+
+ allocator: allocator,
+ events: events,
}
return ww
}
-type workerWatcher struct {
- sync.RWMutex
- container Vector
- // used to control the Destroy stage (that all workers are in the container)
- numWorkers uint64
- workers []worker.BaseProcess
- allocator worker.Allocator
- events events.Handler
-}
-
func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- ww.container.Enqueue(workers[i])
+ ww.container.Push(workers[i])
// add worker to watch slice
ww.workers = append(ww.workers, workers[i])
@@ -57,12 +66,12 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
return nil
}
-// Get is not a thread safe operation
-func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
+// Take is not a thread safe operation
+func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
- w, err := ww.container.Dequeue(ctx)
+ w, err := ww.container.Pop(ctx)
if errors.Is(errors.WatcherStopped, err) {
return nil, errors.E(op, errors.WatcherStopped)
}
@@ -78,11 +87,11 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
// =========================================================
// SLOW PATH
- _ = w.Kill() // how the worker get here???????
- // no free workers in the container
+ _ = w.Kill()
+ // no free workers in the container or worker not in the ReadyState (TTL-ed)
// try to continuously get free one
for {
- w, err = ww.container.Dequeue(ctx)
+ w, err = ww.container.Pop(ctx)
if errors.Is(errors.WatcherStopped, err) {
return nil, errors.E(op, errors.WatcherStopped)
@@ -98,7 +107,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
case worker.StateReady:
return w, nil
case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
+ ww.container.Push(w) // put it back, let worker finish the work
continue
case
// all the possible wrong states
@@ -135,7 +144,7 @@ func (ww *workerWatcher) Allocate() error {
// unlock Allocate mutex
ww.Unlock()
// push the worker to the container
- ww.Push(sw)
+ ww.Release(sw)
return nil
}
@@ -158,11 +167,11 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
}
}
-// Push O(1) operation
-func (ww *workerWatcher) Push(w worker.BaseProcess) {
+// Release O(1) operation
+func (ww *workerWatcher) Release(w worker.BaseProcess) {
switch w.State().Value() {
case worker.StateReady:
- ww.container.Enqueue(w)
+ ww.container.Push(w)
default:
_ = w.Kill()
}
@@ -226,13 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
+ // remove worker
+ ww.Remove(w)
+
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}
- ww.Remove(w)
+ // set state as stopped
+ w.State().Set(worker.StateStopped)
+
err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{