diff options
Diffstat (limited to 'worker_watcher/worker_watcher.go')
-rwxr-xr-x | worker_watcher/worker_watcher.go | 38 |
1 files changed, 14 insertions, 24 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index cfde9931..cfadb951 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -2,7 +2,6 @@ package worker_watcher //nolint:stylecheck import ( "context" - "fmt" "sync" "sync/atomic" "time" @@ -12,10 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/spiral/roadrunner/v2/worker_watcher/container/channel" -) - -const ( - wwName string = "worker_watcher" + "go.uber.org/zap" ) // Vector interface represents vector container @@ -39,22 +35,19 @@ type workerWatcher struct { // used to control Destroy stage (that all workers are in the container) numWorkers *uint64 - workers []worker.BaseProcess - events events.EventBus - eventsID string + workers []worker.BaseProcess + log *zap.Logger allocator worker.Allocator allocateTimeout time.Duration } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher { - eb, id := events.Bus() - ww := &workerWatcher{ +func NewSyncWorkerWatcher(allocator worker.Allocator, log *zap.Logger, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher { + return &workerWatcher{ container: channel.NewVector(numWorkers), - events: eb, - eventsID: id, + log: log, // pass a ptr to the number of workers to avoid blocking in the TTL loop numWorkers: utils.Uint64(numWorkers), allocateTimeout: allocateTimeout, @@ -62,11 +55,11 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocat allocator: allocator, } - - return ww } func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { + ww.Lock() + defer ww.Unlock() for i := 0; i < len(workers); i++ { ww.container.Push(workers[i]) // add worker to watch slice @@ -143,15 +136,14 @@ func (ww *workerWatcher) Allocate() error { sw, err := ww.allocator() if err != nil { // log incident - ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err))) // if no timeout, return error immediately if ww.allocateTimeout == 0 { return errors.E(op, errors.WorkerAllocate, err) } - // every half of a second - allocateFreq := time.NewTicker(time.Millisecond * 500) + // every second + allocateFreq := time.NewTicker(time.Millisecond * 1000) tt := time.After(ww.allocateTimeout) for { @@ -167,7 +159,7 @@ func (ww *workerWatcher) Allocate() error { sw, err = ww.allocator() if err != nil { // log incident - ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err))) + ww.log.Error("allocate retry attempt failed", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) continue } @@ -282,7 +274,6 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { ww.container.Destroy() ww.Unlock() - ww.events.Unsubscribe(ww.eventsID) tt := time.NewTicker(time.Millisecond * 10) defer tt.Stop() for { @@ -343,7 +334,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { - ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("worker stopped, error: %v", err))) + ww.log.Debug("worker stopped", zap.String("internal_event_name", events.EventWorkerWaitExit.String()), zap.Error(err)) } // remove worker @@ -351,8 +342,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace - ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("worker destroyed, pid: %d", w.Pid()))) - + ww.log.Debug("worker destroyed", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err)) return } @@ -361,7 +351,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { err = ww.Allocate() if err != nil { - ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("failed to allocate worker, error: %v", err))) + ww.log.Error("failed to allocate the worker", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) // no workers at all, panic if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { |