diff options
author | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
commit | c90c11b92e229280477a9b049e65ca1048825dd4 (patch) | |
tree | 2a38695cad6dc3095b291575cfb40bc56820d86d /pkg/worker_watcher/worker_watcher.go | |
parent | 1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff) |
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel.
Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1.
Replace third-party amqp091 with the official implementation.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index ca026383..348be199 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -29,7 +29,7 @@ type Vector interface { type workerWatcher struct { sync.RWMutex container Vector - // used to control the Destroy stage (that all workers are in the container) + // used to control Destroy stage (that all workers are in the container) numWorkers uint64 workers []worker.BaseProcess @@ -235,14 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } + // remove worker + ww.Remove(w) + if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } + // set state as stopped w.State().Set(worker.StateStopped) - ww.Remove(w) + err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ |