diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 16:46:32 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 16:46:32 +0300 |
commit | 4151bbffe7b3ab882de5f7ac29f41c974679f087 (patch) | |
tree | c29840fe2b0e530c069f47ec956b606cd8ff6b1d /pkg/worker_watcher/worker_watcher.go | |
parent | 9d018f259b45be9268ae85e089a07f25de894f41 (diff) |
Fix TTL issue, added explanation comments.
The worker after it executed the request, may overwrite the TTL state.
This inconsistency leads to the +1 worker in the FIFO channel.
In this state, the Push operation was blocked.
Add RR_BROADCAST_PATH.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 21 |
1 files changed, 16 insertions, 5 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index f82de958..b2d61d48 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -11,8 +11,18 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) +// Vector interface represents vector container +type Vector interface { + // Enqueue used to put worker to the vector + Enqueue(worker.BaseProcess) + // Dequeue used to get worker from the vector + Dequeue(ctx context.Context) (worker.BaseProcess, error) + // Destroy used to stop releasing the workers + Destroy() +} + // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { ww := &workerWatcher{ container: container.NewVector(numWorkers), numWorkers: numWorkers, @@ -26,7 +36,7 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events type workerWatcher struct { sync.RWMutex - container container.Vector + container Vector // used to control the Destroy stage (that all workers are in the container) numWorkers uint64 workers []worker.BaseProcess @@ -150,11 +160,12 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { // Push O(1) operation func (ww *workerWatcher) Push(w worker.BaseProcess) { - if w.State().Value() != worker.StateReady { + switch w.State().Value() { + case worker.StateReady: + ww.container.Enqueue(w) + default: _ = w.Kill() - return } - ww.container.Enqueue(w) } // Destroy all underlying container (but let them to complete the task) |