summaryrefslogtreecommitdiff
path: root/worker_watcher
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-27 22:42:07 +0300
committerValery Piashchynski <[email protected]>2021-10-27 22:42:07 +0300
commit52a6b1b2fc3eaf3cda5594825f3c5a9ae8a9452b (patch)
tree296d48dab1d5396313c061fbfc930a97af0df9f4 /worker_watcher
parent15e6f474ef1340f4e93f213bab1cb9548e51a1e8 (diff)
Make sure events bus properly closed
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker_watcher')
-rwxr-xr-xworker_watcher/worker_watcher.go30
1 files changed, 5 insertions, 25 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index 871e6146..d425994e 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -148,11 +148,7 @@ func (ww *workerWatcher) Allocate() error {
sw, err := ww.allocator()
if err != nil {
// log incident
- ww.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: wwName,
- M: fmt.Sprintf("can't allocate the worker: %v", err),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err)))
// if no timeout, return error immediately
if ww.allocateTimeout == 0 {
@@ -176,11 +172,7 @@ func (ww *workerWatcher) Allocate() error {
sw, err = ww.allocator()
if err != nil {
// log incident
- ww.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: wwName,
- M: fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err)))
continue
}
@@ -287,11 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
- ww.events.Send(&events.RREvent{
- T: events.EventWorkerWaitExit,
- P: wwName,
- M: fmt.Sprintf("error: %v", err),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("error: %v", err)))
}
// remove worker
@@ -299,11 +287,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
- ww.events.Send(&events.RREvent{
- T: events.EventWorkerDestruct,
- P: wwName,
- M: fmt.Sprintf("pid: %d", w.Pid()),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("pid: %d", w.Pid())))
return
}
@@ -313,11 +297,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
err = ww.Allocate()
if err != nil {
- ww.events.Send(&events.RREvent{
- T: events.EventWorkerProcessExit,
- P: wwName,
- M: fmt.Sprintf("error: %v", err),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("error: %v", err)))
// no workers at all, panic
if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {