diff options
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 17 |
1 files changed, 14 insertions, 3 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index e0dae7f6..b2d61d48 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -11,6 +11,16 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) +// Vector interface represents vector container +type Vector interface { + // Enqueue used to put worker to the vector + Enqueue(worker.BaseProcess) + // Dequeue used to get worker from the vector + Dequeue(ctx context.Context) (worker.BaseProcess, error) + // Destroy used to stop releasing the workers + Destroy() +} + // NewSyncWorkerWatcher is a constructor for the Watcher func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { ww := &workerWatcher{ @@ -150,11 +160,12 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { // Push O(1) operation func (ww *workerWatcher) Push(w worker.BaseProcess) { - if w.State().Value() != worker.StateReady { + switch w.State().Value() { + case worker.StateReady: + ww.container.Enqueue(w) + default: _ = w.Kill() - return } - ww.container.Enqueue(w) } // Destroy all underlying container (but let them to complete the task) |