diff options
author | Valery Piashchynski <[email protected]> | 2021-10-26 19:22:09 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-10-26 19:22:09 +0300 |
commit | 9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (patch) | |
tree | 8fa981011ffb2f4bd9ca685b4935b5c35d7d368f /pool/supervisor_pool.go | |
parent | 160055c16d4c1ca1e0e19853cbb89ef3509c7556 (diff) |
Events package update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pool/supervisor_pool.go')
-rwxr-xr-x | pool/supervisor_pool.go | 49 |
1 files changed, 34 insertions, 15 deletions
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 99af168c..c1fb6eec 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "sync" "time" @@ -12,7 +13,10 @@ import ( "github.com/spiral/roadrunner/v2/worker" ) -const MB = 1024 * 1024 +const ( + MB = 1024 * 1024 + supervisorName string = "supervisor" +) // NSEC_IN_SEC nanoseconds in second const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck @@ -24,20 +28,23 @@ type Supervised interface { } type supervised struct { - cfg *SupervisorConfig - events events.Handler - pool Pool - stopCh chan struct{} - mu *sync.RWMutex + cfg *SupervisorConfig + events events.EventBus + eventsID string + pool Pool + stopCh chan struct{} + mu *sync.RWMutex } -func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { +func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised { + eb, id := events.Bus() sp := &supervised{ - cfg: cfg, - events: events, - pool: pool, - mu: &sync.RWMutex{}, - stopCh: make(chan struct{}), + cfg: cfg, + events: eb, + eventsID: id, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), } return sp @@ -148,7 +155,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventTTL, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) continue } @@ -168,7 +179,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventMaxMemory, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) continue } @@ -223,7 +238,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double-check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventIdleTTL, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) } } } |