summaryrefslogtreecommitdiff
path: root/pool/supervisor_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
committerValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
commit9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (patch)
tree8fa981011ffb2f4bd9ca685b4935b5c35d7d368f /pool/supervisor_pool.go
parent160055c16d4c1ca1e0e19853cbb89ef3509c7556 (diff)
Events package update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pool/supervisor_pool.go')
-rwxr-xr-xpool/supervisor_pool.go49
1 files changed, 34 insertions, 15 deletions
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 99af168c..c1fb6eec 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -2,6 +2,7 @@ package pool
import (
"context"
+ "fmt"
"sync"
"time"
@@ -12,7 +13,10 @@ import (
"github.com/spiral/roadrunner/v2/worker"
)
-const MB = 1024 * 1024
+const (
+ MB = 1024 * 1024
+ supervisorName string = "supervisor"
+)
// NSEC_IN_SEC nanoseconds in second
const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
@@ -24,20 +28,23 @@ type Supervised interface {
}
type supervised struct {
- cfg *SupervisorConfig
- events events.Handler
- pool Pool
- stopCh chan struct{}
- mu *sync.RWMutex
+ cfg *SupervisorConfig
+ events events.EventBus
+ eventsID string
+ pool Pool
+ stopCh chan struct{}
+ mu *sync.RWMutex
}
-func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised {
+ eb, id := events.Bus()
sp := &supervised{
- cfg: cfg,
- events: events,
- pool: pool,
- mu: &sync.RWMutex{},
- stopCh: make(chan struct{}),
+ cfg: cfg,
+ events: eb,
+ eventsID: id,
+ pool: pool,
+ mu: &sync.RWMutex{},
+ stopCh: make(chan struct{}),
}
return sp
@@ -148,7 +155,11 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
+ sp.events.Send(&events.RREvent{
+ T: events.EventTTL,
+ P: supervisorName,
+ M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
+ })
continue
}
@@ -168,7 +179,11 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
+ sp.events.Send(&events.RREvent{
+ T: events.EventMaxMemory,
+ P: supervisorName,
+ M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
+ })
continue
}
@@ -223,7 +238,11 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double-check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
+ sp.events.Send(&events.RREvent{
+ T: events.EventIdleTTL,
+ P: supervisorName,
+ M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
+ })
}
}
}