summaryrefslogtreecommitdiff
path: root/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker_watcher/worker_watcher.go')
-rwxr-xr-xworker_watcher/worker_watcher.go38
1 files changed, 14 insertions, 24 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index cfde9931..cfadb951 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -2,7 +2,6 @@ package worker_watcher //nolint:stylecheck
import (
"context"
- "fmt"
"sync"
"sync/atomic"
"time"
@@ -12,10 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
"github.com/spiral/roadrunner/v2/worker_watcher/container/channel"
-)
-
-const (
- wwName string = "worker_watcher"
+ "go.uber.org/zap"
)
// Vector interface represents vector container
@@ -39,22 +35,19 @@ type workerWatcher struct {
// used to control Destroy stage (that all workers are in the container)
numWorkers *uint64
- workers []worker.BaseProcess
- events events.EventBus
- eventsID string
+ workers []worker.BaseProcess
+ log *zap.Logger
allocator worker.Allocator
allocateTimeout time.Duration
}
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher {
- eb, id := events.Bus()
- ww := &workerWatcher{
+func NewSyncWorkerWatcher(allocator worker.Allocator, log *zap.Logger, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher {
+ return &workerWatcher{
container: channel.NewVector(numWorkers),
- events: eb,
- eventsID: id,
+ log: log,
// pass a ptr to the number of workers to avoid blocking in the TTL loop
numWorkers: utils.Uint64(numWorkers),
allocateTimeout: allocateTimeout,
@@ -62,11 +55,11 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocat
allocator: allocator,
}
-
- return ww
}
func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
+ ww.Lock()
+ defer ww.Unlock()
for i := 0; i < len(workers); i++ {
ww.container.Push(workers[i])
// add worker to watch slice
@@ -143,15 +136,14 @@ func (ww *workerWatcher) Allocate() error {
sw, err := ww.allocator()
if err != nil {
// log incident
- ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err)))
// if no timeout, return error immediately
if ww.allocateTimeout == 0 {
return errors.E(op, errors.WorkerAllocate, err)
}
- // every half of a second
- allocateFreq := time.NewTicker(time.Millisecond * 500)
+ // every second
+ allocateFreq := time.NewTicker(time.Millisecond * 1000)
tt := time.After(ww.allocateTimeout)
for {
@@ -167,7 +159,7 @@ func (ww *workerWatcher) Allocate() error {
sw, err = ww.allocator()
if err != nil {
// log incident
- ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err)))
+ ww.log.Error("allocate retry attempt failed", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
continue
}
@@ -282,7 +274,6 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
ww.container.Destroy()
ww.Unlock()
- ww.events.Unsubscribe(ww.eventsID)
tt := time.NewTicker(time.Millisecond * 10)
defer tt.Stop()
for {
@@ -343,7 +334,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
- ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("worker stopped, error: %v", err)))
+ ww.log.Debug("worker stopped", zap.String("internal_event_name", events.EventWorkerWaitExit.String()), zap.Error(err))
}
// remove worker
@@ -351,8 +342,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
- ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("worker destroyed, pid: %d", w.Pid())))
-
+ ww.log.Debug("worker destroyed", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err))
return
}
@@ -361,7 +351,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
err = ww.Allocate()
if err != nil {
- ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("failed to allocate worker, error: %v", err)))
+ ww.log.Error("failed to allocate the worker", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
// no workers at all, panic
if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {