diff options
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 4 | ||||
-rwxr-xr-x | util/events.go | 9 |
2 files changed, 11 insertions, 2 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 8788e509..170a6ac8 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -279,11 +279,13 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("process wait") err := w.Wait() if err != nil { + ww.mutex.Lock() ww.events.Push(events.WorkerEvent{ Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err), }) + ww.mutex.Unlock() } if w.State().Value() == internal.StateDestroyed { @@ -294,10 +296,12 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { _ = ww.stack.FindAndRemoveByPid(w.Pid()) err = ww.AllocateNew() if err != nil { + ww.mutex.Lock() ww.events.Push(events.PoolEvent{ Event: events.EventPoolError, Payload: errors.E(op, err), }) + ww.mutex.Unlock() } } diff --git a/util/events.go b/util/events.go index 259c7ddb..dbb5990b 100755 --- a/util/events.go +++ b/util/events.go @@ -1,12 +1,15 @@ package util import ( + "sync" + "github.com/spiral/roadrunner/v2/interfaces/events" ) // EventHandler helps to broadcast events to multiple listeners. type EventHandler struct { listeners []events.EventListener + sync.RWMutex } func NewEventsHandler() events.Handler { @@ -20,12 +23,14 @@ func (eb *EventHandler) NumListeners() int { // AddListener registers new event listener. func (eb *EventHandler) AddListener(listener events.EventListener) { + eb.Lock() + defer eb.Unlock() eb.listeners = append(eb.listeners, listener) } // Push broadcast events across all event listeners. func (eb *EventHandler) Push(e interface{}) { - for _, listener := range eb.listeners { - listener(e) + for k := range eb.listeners { + eb.listeners[k](e) } } |