summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go21
1 files changed, 16 insertions, 5 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index f82de958..b2d61d48 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -11,8 +11,18 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
+// Vector interface represents vector container
+type Vector interface {
+ // Enqueue used to put worker to the vector
+ Enqueue(worker.BaseProcess)
+ // Dequeue used to get worker from the vector
+ Dequeue(ctx context.Context) (worker.BaseProcess, error)
+ // Destroy used to stop releasing the workers
+ Destroy()
+}
+
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
numWorkers: numWorkers,
@@ -26,7 +36,7 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events
type workerWatcher struct {
sync.RWMutex
- container container.Vector
+ container Vector
// used to control the Destroy stage (that all workers are in the container)
numWorkers uint64
workers []worker.BaseProcess
@@ -150,11 +160,12 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
// Push O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
- if w.State().Value() != worker.StateReady {
+ switch w.State().Value() {
+ case worker.StateReady:
+ ww.container.Enqueue(w)
+ default:
_ = w.Kill()
- return
}
- ww.container.Enqueue(w)
}
// Destroy all underlying container (but let them to complete the task)