summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 19:11:29 +0300
committerValery Piashchynski <[email protected]>2021-07-14 19:11:29 +0300
commit3ce3b5a6e0839e31d2cfb3d3b6fa7f9c6ca2e0af (patch)
tree0f975f62cc60b8ab75c92691f08270242c36f311 /pkg/worker_watcher
parentcd07985494b3ebb03fd6553bed9aa1393052ffc5 (diff)
parent67db4b5f7b66e9a32713133baed83c3ab7146bb8 (diff)
Merge remote-tracking branch 'origin/master' into feature/jobs_plugin
# Conflicts: # pkg/worker_watcher/interface.go
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r--pkg/worker_watcher/interface.go1
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go17
2 files changed, 15 insertions, 3 deletions
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 8fa88fe8..e7503467 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -1,3 +1,4 @@
+
package worker_watcher //nolint:stylecheck
import (
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index e0dae7f6..b2d61d48 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -11,6 +11,16 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
+// Vector interface represents vector container
+type Vector interface {
+ // Enqueue used to put worker to the vector
+ Enqueue(worker.BaseProcess)
+ // Dequeue used to get worker from the vector
+ Dequeue(ctx context.Context) (worker.BaseProcess, error)
+ // Destroy used to stop releasing the workers
+ Destroy()
+}
+
// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
@@ -150,11 +160,12 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
// Push O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
- if w.State().Value() != worker.StateReady {
+ switch w.State().Value() {
+ case worker.StateReady:
+ ww.container.Enqueue(w)
+ default:
_ = w.Kill()
- return
}
- ww.container.Enqueue(w)
}
// Destroy all underlying container (but let them to complete the task)