summaryrefslogtreecommitdiff
path: root/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker_watcher/worker_watcher.go')
-rwxr-xr-xworker_watcher/worker_watcher.go42
1 files changed, 19 insertions, 23 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index 175972e0..d425994e 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -2,6 +2,7 @@ package worker_watcher //nolint:stylecheck
import (
"context"
+ "fmt"
"sync"
"sync/atomic"
"time"
@@ -13,6 +14,10 @@ import (
"github.com/spiral/roadrunner/v2/worker_watcher/container/channel"
)
+const (
+ wwName string = "worker_watcher"
+)
+
// Vector interface represents vector container
type Vector interface {
// Push used to put worker to the vector
@@ -34,25 +39,28 @@ type workerWatcher struct {
// used to control Destroy stage (that all workers are in the container)
numWorkers *uint64
- workers []worker.BaseProcess
+ workers []worker.BaseProcess
+ events events.EventBus
+ eventsID string
allocator worker.Allocator
allocateTimeout time.Duration
- events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher {
+ eb, id := events.Bus()
ww := &workerWatcher{
container: channel.NewVector(numWorkers),
+ events: eb,
+ eventsID: id,
// pass a ptr to the number of workers to avoid blocking in the TTL loop
numWorkers: utils.Uint64(numWorkers),
allocateTimeout: allocateTimeout,
workers: make([]worker.BaseProcess, 0, numWorkers),
allocator: allocator,
- events: events,
}
return ww
@@ -140,11 +148,7 @@ func (ww *workerWatcher) Allocate() error {
sw, err := ww.allocator()
if err != nil {
// log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err)))
// if no timeout, return error immediately
if ww.allocateTimeout == 0 {
@@ -168,11 +172,7 @@ func (ww *workerWatcher) Allocate() error {
sw, err = ww.allocator()
if err != nil {
// log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err)))
continue
}
@@ -234,6 +234,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) {
ww.container.Destroy()
ww.Unlock()
+ ww.events.Unsubscribe(ww.eventsID)
tt := time.NewTicker(time.Millisecond * 100)
defer tt.Stop()
for { //nolint:gosimple
@@ -278,10 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
- ww.events.Push(events.WorkerEvent{
- Event: events.EventWorkerWaitExit,
- Payload: err,
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("error: %v", err)))
}
// remove worker
@@ -289,7 +287,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
- ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("pid: %d", w.Pid())))
+
return
}
@@ -298,10 +297,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
err = ww.Allocate()
if err != nil {
- ww.events.Push(events.PoolEvent{
- Event: events.EventWorkerProcessExit,
- Error: errors.E(op, err),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("error: %v", err)))
// no workers at all, panic
if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {