summaryrefslogtreecommitdiff
path: root/worker_watcher
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
committerValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
commit9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (patch)
tree8fa981011ffb2f4bd9ca685b4935b5c35d7d368f /worker_watcher
parent160055c16d4c1ca1e0e19853cbb89ef3509c7556 (diff)
Events package update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker_watcher')
-rwxr-xr-xworker_watcher/worker_watcher.go58
1 files changed, 37 insertions, 21 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index 175972e0..871e6146 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -2,6 +2,7 @@ package worker_watcher //nolint:stylecheck
import (
"context"
+ "fmt"
"sync"
"sync/atomic"
"time"
@@ -13,6 +14,10 @@ import (
"github.com/spiral/roadrunner/v2/worker_watcher/container/channel"
)
+const (
+ wwName string = "worker_watcher"
+)
+
// Vector interface represents vector container
type Vector interface {
// Push used to put worker to the vector
@@ -34,25 +39,28 @@ type workerWatcher struct {
// used to control Destroy stage (that all workers are in the container)
numWorkers *uint64
- workers []worker.BaseProcess
+ workers []worker.BaseProcess
+ events events.EventBus
+ eventsID string
allocator worker.Allocator
allocateTimeout time.Duration
- events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher {
+ eb, id := events.Bus()
ww := &workerWatcher{
container: channel.NewVector(numWorkers),
+ events: eb,
+ eventsID: id,
// pass a ptr to the number of workers to avoid blocking in the TTL loop
numWorkers: utils.Uint64(numWorkers),
allocateTimeout: allocateTimeout,
workers: make([]worker.BaseProcess, 0, numWorkers),
allocator: allocator,
- events: events,
}
return ww
@@ -140,11 +148,11 @@ func (ww *workerWatcher) Allocate() error {
sw, err := ww.allocator()
if err != nil {
// log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
- })
+ ww.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: wwName,
+ M: fmt.Sprintf("can't allocate the worker: %v", err),
+ })
// if no timeout, return error immediately
if ww.allocateTimeout == 0 {
@@ -168,11 +176,11 @@ func (ww *workerWatcher) Allocate() error {
sw, err = ww.allocator()
if err != nil {
// log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
- })
+ ww.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: wwName,
+ M: fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err),
+ })
continue
}
@@ -234,6 +242,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) {
ww.container.Destroy()
ww.Unlock()
+ ww.events.Unsubscribe(ww.eventsID)
tt := time.NewTicker(time.Millisecond * 100)
defer tt.Stop()
for { //nolint:gosimple
@@ -278,9 +287,10 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
- ww.events.Push(events.WorkerEvent{
- Event: events.EventWorkerWaitExit,
- Payload: err,
+ ww.events.Send(&events.RREvent{
+ T: events.EventWorkerWaitExit,
+ P: wwName,
+ M: fmt.Sprintf("error: %v", err),
})
}
@@ -289,7 +299,12 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
- ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ ww.events.Send(&events.RREvent{
+ T: events.EventWorkerDestruct,
+ P: wwName,
+ M: fmt.Sprintf("pid: %d", w.Pid()),
+ })
+
return
}
@@ -298,9 +313,10 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
err = ww.Allocate()
if err != nil {
- ww.events.Push(events.PoolEvent{
- Event: events.EventWorkerProcessExit,
- Error: errors.E(op, err),
+ ww.events.Send(&events.RREvent{
+ T: events.EventWorkerProcessExit,
+ P: wwName,
+ M: fmt.Sprintf("error: %v", err),
})
// no workers at all, panic