summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-11-07 16:08:55 +0300
committerGitHub <[email protected]>2021-11-07 16:08:55 +0300
commit92575ed6246b9adfdc74efced14e527743cf01b5 (patch)
tree9020d9f60fd5b3c3bd0c7f8b674cfe978ea1e4cc
parenta18e8eb737d686a4adf8d965aaf1c5a936c6f8b0 (diff)
parentc639dd2ffde1b6c8679d4b3fe6f16130ed091299 (diff)
[#852]: fix(vector): deadlock on the channel send
[#852]: fix(vector): deadlock on the channel send
-rw-r--r--worker_watcher/container/channel/vec.go12
-rwxr-xr-xworker_watcher/worker_watcher.go4
2 files changed, 10 insertions, 6 deletions
diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go
index fd50c8d1..63df052f 100644
--- a/worker_watcher/container/channel/vec.go
+++ b/worker_watcher/container/channel/vec.go
@@ -69,8 +69,16 @@ func (v *Vec) Push(w worker.BaseProcess) {
// replace with the new one and return from the loop
// new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
// But this case will be handled in the worker_watcher::Get
- v.workers <- w
- return
+ select {
+ case v.workers <- w:
+ return
+ // the place for the new worker was occupied before
+ default:
+ // kill the new worker and reallocate it
+ w.State().Set(worker.StateInvalid)
+ _ = w.Kill()
+ return
+ }
}
}
}
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index 544c9789..6cd01177 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -112,10 +112,6 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
return nil, errors.E(op, err)
}
- if err != nil {
- return nil, errors.E(op, err)
- }
-
switch w.State().Value() {
// return only workers in the Ready state
// check first