diff options
author | Valery Piashchynski <[email protected]> | 2021-10-26 19:22:09 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-10-26 19:22:09 +0300 |
commit | 9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (patch) | |
tree | 8fa981011ffb2f4bd9ca685b4935b5c35d7d368f /worker_watcher/worker_watcher.go | |
parent | 160055c16d4c1ca1e0e19853cbb89ef3509c7556 (diff) |
Events package update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker_watcher/worker_watcher.go')
-rwxr-xr-x | worker_watcher/worker_watcher.go | 58 |
1 files changed, 37 insertions, 21 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 175972e0..871e6146 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -2,6 +2,7 @@ package worker_watcher //nolint:stylecheck import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -13,6 +14,10 @@ import ( "github.com/spiral/roadrunner/v2/worker_watcher/container/channel" ) +const ( + wwName string = "worker_watcher" +) + // Vector interface represents vector container type Vector interface { // Push used to put worker to the vector @@ -34,25 +39,28 @@ type workerWatcher struct { // used to control Destroy stage (that all workers are in the container) numWorkers *uint64 - workers []worker.BaseProcess + workers []worker.BaseProcess + events events.EventBus + eventsID string allocator worker.Allocator allocateTimeout time.Duration - events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher { + eb, id := events.Bus() ww := &workerWatcher{ container: channel.NewVector(numWorkers), + events: eb, + eventsID: id, // pass a ptr to the number of workers to avoid blocking in the TTL loop numWorkers: utils.Uint64(numWorkers), allocateTimeout: allocateTimeout, workers: make([]worker.BaseProcess, 0, numWorkers), allocator: allocator, - events: events, } return ww @@ -140,11 +148,11 @@ func (ww *workerWatcher) Allocate() error { sw, err := ww.allocator() if err != nil { // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), - }) + ww.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: wwName, + M: fmt.Sprintf("can't allocate the worker: %v", err), + }) // if no timeout, return error immediately if ww.allocateTimeout == 0 { @@ -168,11 +176,11 @@ func (ww *workerWatcher) Allocate() error { sw, err = ww.allocator() if err != nil { // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), - }) + ww.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: wwName, + M: fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err), + }) continue } @@ -234,6 +242,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) { ww.container.Destroy() ww.Unlock() + ww.events.Unsubscribe(ww.eventsID) tt := time.NewTicker(time.Millisecond * 100) defer tt.Stop() for { //nolint:gosimple @@ -278,9 +287,10 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { - ww.events.Push(events.WorkerEvent{ - Event: events.EventWorkerWaitExit, - Payload: err, + ww.events.Send(&events.RREvent{ + T: events.EventWorkerWaitExit, + P: wwName, + M: fmt.Sprintf("error: %v", err), }) } @@ -289,7 +299,12 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace - ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + ww.events.Send(&events.RREvent{ + T: events.EventWorkerDestruct, + P: wwName, + M: fmt.Sprintf("pid: %d", w.Pid()), + }) + return } @@ -298,9 +313,10 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { err = ww.Allocate() if err != nil { - ww.events.Push(events.PoolEvent{ - Event: events.EventWorkerProcessExit, - Error: errors.E(op, err), + ww.events.Send(&events.RREvent{ + T: events.EventWorkerProcessExit, + P: wwName, + M: fmt.Sprintf("error: %v", err), }) // no workers at all, panic |