summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
committerValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
commitc90c11b92e229280477a9b049e65ca1048825dd4 (patch)
tree2a38695cad6dc3095b291575cfb40bc56820d86d /pkg/worker_watcher/worker_watcher.go
parent1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff)
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel. Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1. Replace third-party amqp091 with the official implementation. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go8
1 files changed, 6 insertions, 2 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index ca026383..348be199 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -29,7 +29,7 @@ type Vector interface {
type workerWatcher struct {
sync.RWMutex
container Vector
- // used to control the Destroy stage (that all workers are in the container)
+ // used to control Destroy stage (that all workers are in the container)
numWorkers uint64
workers []worker.BaseProcess
@@ -235,14 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
+ // remove worker
+ ww.Remove(w)
+
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}
+ // set state as stopped
w.State().Set(worker.StateStopped)
- ww.Remove(w)
+
err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{