diff options
Diffstat (limited to 'worker_watcher/worker_watcher.go')
-rwxr-xr-x | worker_watcher/worker_watcher.go | 42 |
1 files changed, 19 insertions, 23 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 175972e0..d425994e 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -2,6 +2,7 @@ package worker_watcher //nolint:stylecheck import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -13,6 +14,10 @@ import ( "github.com/spiral/roadrunner/v2/worker_watcher/container/channel" ) +const ( + wwName string = "worker_watcher" +) + // Vector interface represents vector container type Vector interface { // Push used to put worker to the vector @@ -34,25 +39,28 @@ type workerWatcher struct { // used to control Destroy stage (that all workers are in the container) numWorkers *uint64 - workers []worker.BaseProcess + workers []worker.BaseProcess + events events.EventBus + eventsID string allocator worker.Allocator allocateTimeout time.Duration - events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher { + eb, id := events.Bus() ww := &workerWatcher{ container: channel.NewVector(numWorkers), + events: eb, + eventsID: id, // pass a ptr to the number of workers to avoid blocking in the TTL loop numWorkers: utils.Uint64(numWorkers), allocateTimeout: allocateTimeout, workers: make([]worker.BaseProcess, 0, numWorkers), allocator: allocator, - events: events, } return ww @@ -140,11 +148,7 @@ func (ww *workerWatcher) Allocate() error { sw, err := ww.allocator() if err != nil { // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), - }) + ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err))) // if no timeout, return error immediately if ww.allocateTimeout == 0 { @@ -168,11 +172,7 @@ func (ww *workerWatcher) Allocate() error { sw, err = ww.allocator() if err != nil { // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), - }) + ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err))) continue } @@ -234,6 +234,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) { ww.container.Destroy() ww.Unlock() + ww.events.Unsubscribe(ww.eventsID) tt := time.NewTicker(time.Millisecond * 100) defer tt.Stop() for { //nolint:gosimple @@ -278,10 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { - ww.events.Push(events.WorkerEvent{ - Event: events.EventWorkerWaitExit, - Payload: err, - }) + ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("error: %v", err))) } // remove worker @@ -289,7 +287,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace - ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("pid: %d", w.Pid()))) + return } @@ -298,10 +297,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { err = ww.Allocate() if err != nil { - ww.events.Push(events.PoolEvent{ - Event: events.EventWorkerProcessExit, - Error: errors.E(op, err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("error: %v", err))) // no workers at all, panic if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { |