diff options
author | Valery Piashchynski <[email protected]> | 2021-11-07 20:32:22 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-11-07 20:32:22 +0300 |
commit | d97b9c8989616dff32aaa5cbbca7d9fbe893fc9b (patch) | |
tree | 9d986bb4fc21fb37da9247faacd4a84e04250a68 | |
parent | 210764506d9d84d66c70e28197b03c6c3069feb3 (diff) |
update vector
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | worker_watcher/container/channel/vec.go | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 63df052f..3dfe855e 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -29,7 +29,6 @@ func NewVector(len uint64) *Vec { // Push is O(1) operation // In case of TTL and full channel O(n) worst case, where n is len of the channel func (v *Vec) Push(w worker.BaseProcess) { - // Non-blocking channel send select { case v.workers <- w: // default select branch is only possible when dealing with TTL @@ -51,13 +50,20 @@ func (v *Vec) Push(w worker.BaseProcess) { */ wrk := <-v.workers switch wrk.State().Value() { - // skip good states, put worker back + // good states case worker.StateWorking, worker.StateReady: // put the worker back // generally, while send and receive operations are concurrent (from the channel), channel behave // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO - v.workers <- wrk - continue + select { + case v.workers <- wrk: + continue + default: + // kill the new worker and reallocate it + w.State().Set(worker.StateInvalid) + _ = w.Kill() + continue + } /* Bad states are here. */ |