summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-11-07 20:42:19 +0300
committerGitHub <[email protected]>2021-11-07 20:42:19 +0300
commit1bbf4a472cc476dafe14710c188748a9be8186e0 (patch)
tree9d986bb4fc21fb37da9247faacd4a84e04250a68
parent210764506d9d84d66c70e28197b03c6c3069feb3 (diff)
parentd97b9c8989616dff32aaa5cbbca7d9fbe893fc9b (diff)
[#855]: fix(vector): non-blocking send in all cases
[#855]: fix(vector): non-blocking send in all cases
-rw-r--r--worker_watcher/container/channel/vec.go14
1 files changed, 10 insertions, 4 deletions
diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go
index 63df052f..3dfe855e 100644
--- a/worker_watcher/container/channel/vec.go
+++ b/worker_watcher/container/channel/vec.go
@@ -29,7 +29,6 @@ func NewVector(len uint64) *Vec {
// Push is O(1) operation
// In case of TTL and full channel O(n) worst case, where n is len of the channel
func (v *Vec) Push(w worker.BaseProcess) {
- // Non-blocking channel send
select {
case v.workers <- w:
// default select branch is only possible when dealing with TTL
@@ -51,13 +50,20 @@ func (v *Vec) Push(w worker.BaseProcess) {
*/
wrk := <-v.workers
switch wrk.State().Value() {
- // skip good states, put worker back
+ // good states
case worker.StateWorking, worker.StateReady:
// put the worker back
// generally, while send and receive operations are concurrent (from the channel), channel behave
// like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
- v.workers <- wrk
- continue
+ select {
+ case v.workers <- wrk:
+ continue
+ default:
+ // kill the new worker and reallocate it
+ w.State().Set(worker.StateInvalid)
+ _ = w.Kill()
+ continue
+ }
/*
Bad states are here.
*/