diff options
author | Valery Piashchynski <[email protected]> | 2021-10-27 22:42:07 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-10-27 22:42:07 +0300 |
commit | 52a6b1b2fc3eaf3cda5594825f3c5a9ae8a9452b (patch) | |
tree | 296d48dab1d5396313c061fbfc930a97af0df9f4 /worker_watcher | |
parent | 15e6f474ef1340f4e93f213bab1cb9548e51a1e8 (diff) |
Make sure events bus properly closed
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker_watcher')
-rwxr-xr-x | worker_watcher/worker_watcher.go | 30 |
1 files changed, 5 insertions, 25 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 871e6146..d425994e 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -148,11 +148,7 @@ func (ww *workerWatcher) Allocate() error { sw, err := ww.allocator() if err != nil { // log incident - ww.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: wwName, - M: fmt.Sprintf("can't allocate the worker: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err))) // if no timeout, return error immediately if ww.allocateTimeout == 0 { @@ -176,11 +172,7 @@ func (ww *workerWatcher) Allocate() error { sw, err = ww.allocator() if err != nil { // log incident - ww.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: wwName, - M: fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err))) continue } @@ -287,11 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { - ww.events.Send(&events.RREvent{ - T: events.EventWorkerWaitExit, - P: wwName, - M: fmt.Sprintf("error: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("error: %v", err))) } // remove worker @@ -299,11 +287,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace - ww.events.Send(&events.RREvent{ - T: events.EventWorkerDestruct, - P: wwName, - M: fmt.Sprintf("pid: %d", w.Pid()), - }) + ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("pid: %d", w.Pid()))) return } @@ -313,11 +297,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { err = ww.Allocate() if err != nil { - ww.events.Send(&events.RREvent{ - T: events.EventWorkerProcessExit, - P: wwName, - M: fmt.Sprintf("error: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("error: %v", err))) // no workers at all, panic if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { |