From 2f727a7b3f1587c65eb5ab3a32287ee628cbbc99 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 7 Nov 2021 20:32:22 +0300 Subject: update vector Signed-off-by: Valery Piashchynski --- worker_watcher/container/channel/vec.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 63df052f..3dfe855e 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -29,7 +29,6 @@ func NewVector(len uint64) *Vec { // Push is O(1) operation // In case of TTL and full channel O(n) worst case, where n is len of the channel func (v *Vec) Push(w worker.BaseProcess) { - // Non-blocking channel send select { case v.workers <- w: // default select branch is only possible when dealing with TTL @@ -51,13 +50,20 @@ func (v *Vec) Push(w worker.BaseProcess) { */ wrk := <-v.workers switch wrk.State().Value() { - // skip good states, put worker back + // good states case worker.StateWorking, worker.StateReady: // put the worker back // generally, while send and receive operations are concurrent (from the channel), channel behave // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO - v.workers <- wrk - continue + select { + case v.workers <- wrk: + continue + default: + // kill the new worker and reallocate it + w.State().Set(worker.StateInvalid) + _ = w.Kill() + continue + } /* Bad states are here. */ -- cgit v1.2.3