diff options
33 files changed, 789 insertions, 708 deletions
@@ -14,6 +14,7 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.out -covermode=atomic ./bst go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.out -covermode=atomic ./priority_queue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/events.out -covermode=atomic ./events echo 'mode: atomic' > ./coverage-ci/summary.txt tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt @@ -25,3 +26,4 @@ test: ## Run application tests go test -v -race -tags=debug ./worker_watcher go test -v -race -tags=debug ./bst go test -v -race -tags=debug ./priority_queue + go test -v -race -tags=debug ./events diff --git a/events/docs/events.md b/events/docs/events.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/events/docs/events.md diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 00000000..6c501392 --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,22 @@ +package events + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEvenHandler(t *testing.T) { + eh, id := Bus() + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "http.EventJobOK", ch) + require.NoError(t, err) + + eh.Send(NewRREvent(EventJobOK, "foo", "http")) + + evt := <-ch + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) +} diff --git a/events/eventsbus.go b/events/eventsbus.go new file mode 100644 index 00000000..79f5babd --- /dev/null +++ b/events/eventsbus.go @@ -0,0 +1,129 @@ +package events + +import ( + "fmt" + "sync" +) + +type sub struct { + pattern string + w *wildcard + events chan<- Event +} + +type eventsBus struct { + sync.RWMutex + subscribers sync.Map + internalEvCh chan Event + stop chan struct{} +} + +func newEventsBus() *eventsBus { + return &eventsBus{ + internalEvCh: make(chan Event, 100), + stop: make(chan struct{}), + } +} + +/* +http.* <- +*/ + +// SubscribeAll for all RR events +// returns subscriptionID +func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error { + return eb.subscribe(subID, "*", ch) +} + +// SubscribeP pattern like "pluginName.EventType" +func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error { + return eb.subscribe(subID, pattern, ch) +} + +func (eb *eventsBus) Unsubscribe(subID string) { + eb.subscribers.Delete(subID) +} +func (eb *eventsBus) UnsubscribeP(subID, pattern string) { + if sb, ok := eb.subscribers.Load(subID); ok { + eb.Lock() + defer eb.Unlock() + + sbArr := sb.([]*sub) + + for i := 0; i < len(sbArr); i++ { + if sbArr[i].pattern == pattern { + sbArr[i] = sbArr[len(sbArr)-1] + sbArr = sbArr[:len(sbArr)-1] + // replace with new array + eb.subscribers.Store(subID, sbArr) + return + } + } + } +} + +// Send sends event to the events bus +func (eb *eventsBus) Send(ev Event) { + eb.internalEvCh <- ev +} + +func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error { + eb.Lock() + defer eb.Unlock() + w, err := newWildcard(pattern) + if err != nil { + return err + } + + if sb, ok := eb.subscribers.Load(subID); ok { + // at this point we are confident that sb is a []*sub type + subArr := sb.([]*sub) + subArr = append(subArr, &sub{ + pattern: pattern, + w: w, + events: ch, + }) + + eb.subscribers.Store(subID, subArr) + + return nil + } + + subArr := make([]*sub, 0, 5) + subArr = append(subArr, &sub{ + pattern: pattern, + w: w, + events: ch, + }) + + eb.subscribers.Store(subID, subArr) + + return nil +} + +func (eb *eventsBus) handleEvents() { + for { //nolint:gosimple + select { + case ev := <-eb.internalEvCh: + // + wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String()) + + eb.subscribers.Range(func(key, value interface{}) bool { + vsub := value.([]*sub) + + for i := 0; i < len(vsub); i++ { + if vsub[i].w.match(wc) { + select { + case vsub[i].events <- ev: + return true + default: + return true + } + } + } + + return true + }) + } + } +} diff --git a/events/general.go b/events/general.go deleted file mode 100755 index 5cf13e10..00000000 --- a/events/general.go +++ /dev/null @@ -1,41 +0,0 @@ -package events - -import ( - "sync" -) - -const UnknownEventType string = "Unknown event type" - -// HandlerImpl helps to broadcast events to multiple listeners. -type HandlerImpl struct { - listeners []Listener - sync.RWMutex // all receivers should be pointers -} - -func NewEventsHandler() Handler { - return &HandlerImpl{listeners: make([]Listener, 0, 2)} -} - -// NumListeners returns number of event listeners. -func (eb *HandlerImpl) NumListeners() int { - eb.Lock() - defer eb.Unlock() - return len(eb.listeners) -} - -// AddListener registers new event listener. -func (eb *HandlerImpl) AddListener(listener Listener) { - eb.Lock() - defer eb.Unlock() - eb.listeners = append(eb.listeners, listener) -} - -// Push broadcast events across all event listeners. -func (eb *HandlerImpl) Push(e interface{}) { - // ReadLock here because we are not changing listeners - eb.RLock() - defer eb.RUnlock() - for k := range eb.listeners { - eb.listeners[k](e) - } -} diff --git a/events/grpc_event.go b/events/grpc_event.go deleted file mode 100644 index 31ff4957..00000000 --- a/events/grpc_event.go +++ /dev/null @@ -1,39 +0,0 @@ -package events - -import ( - "time" - - "google.golang.org/grpc" -) - -const ( - // EventUnaryCallOk represents success unary call response - EventUnaryCallOk G = iota + 13000 - - // EventUnaryCallErr raised when unary call ended with error - EventUnaryCallErr -) - -type G int64 - -func (ev G) String() string { - switch ev { - case EventUnaryCallOk: - return "EventUnaryCallOk" - case EventUnaryCallErr: - return "EventUnaryCallErr" - } - return UnknownEventType -} - -// JobEvent represent job event. -type GRPCEvent struct { - Event G - // Info contains unary call info. - Info *grpc.UnaryServerInfo - // Error associated with event. - Error error - // event timings - Start time.Time - Elapsed time.Duration -} diff --git a/events/init.go b/events/init.go new file mode 100644 index 00000000..25e92fc5 --- /dev/null +++ b/events/init.go @@ -0,0 +1,20 @@ +package events + +import ( + "sync" + + "github.com/google/uuid" +) + +var evBus *eventsBus +var onInit = &sync.Once{} + +func Bus() (*eventsBus, string) { + onInit.Do(func() { + evBus = newEventsBus() + go evBus.handleEvents() + }) + + // return events bus with subscriberID + return evBus, uuid.NewString() +} diff --git a/events/interface.go b/events/interface.go deleted file mode 100644 index 7d57e4d0..00000000 --- a/events/interface.go +++ /dev/null @@ -1,14 +0,0 @@ -package events - -// Handler interface -type Handler interface { - // NumListeners return number of active listeners - NumListeners() int - // AddListener adds lister to the publisher - AddListener(listener Listener) - // Push pushes event to the listeners - Push(e interface{}) -} - -// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service. -type Listener func(event interface{}) diff --git a/events/jobs_events.go b/events/jobs_events.go deleted file mode 100644 index f65ede67..00000000 --- a/events/jobs_events.go +++ /dev/null @@ -1,81 +0,0 @@ -package events - -import ( - "time" -) - -const ( - // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK J = iota + 12000 - - // EventPushError caused when job can not be registered. - EventPushError - - // EventJobStart thrown when new job received. - EventJobStart - - // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. - EventJobOK - - // EventJobError thrown on all job related errors. See JobError as context. - EventJobError - - // EventPipeActive when pipeline has started. - EventPipeActive - - // EventPipeStopped when pipeline has been stopped. - EventPipeStopped - - // EventPipePaused when pipeline has been paused. - EventPipePaused - - // EventPipeError when pipeline specific error happen. - EventPipeError - - // EventDriverReady thrown when broken is ready to accept/serve tasks. - EventDriverReady -) - -type J int64 - -func (ev J) String() string { - switch ev { - case EventPushOK: - return "EventPushOK" - case EventPushError: - return "EventPushError" - case EventJobStart: - return "EventJobStart" - case EventJobOK: - return "EventJobOK" - case EventJobError: - return "EventJobError" - case EventPipeActive: - return "EventPipeActive" - case EventPipeStopped: - return "EventPipeStopped" - case EventPipeError: - return "EventPipeError" - case EventDriverReady: - return "EventDriverReady" - case EventPipePaused: - return "EventPipePaused" - } - return UnknownEventType -} - -// JobEvent represent job event. -type JobEvent struct { - Event J - // String is job id. - ID string - // Pipeline name - Pipeline string - // Associated driver name (amqp, ephemeral, etc) - Driver string - // Error for the jobs/pipes errors - Error error - // event timings - Start time.Time - Elapsed time.Duration -} diff --git a/events/pool_events.go b/events/pool_events.go deleted file mode 100644 index eb28df6a..00000000 --- a/events/pool_events.go +++ /dev/null @@ -1,71 +0,0 @@ -package events - -const ( - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct P = iota + 10000 - - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - - // EventSupervisorError triggered when supervisor can not complete work. - EventSupervisorError - - // EventWorkerProcessExit triggered on process wait exit - EventWorkerProcessExit - - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) - EventTTL - - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL - - // EventPoolRestart triggered when pool restart is needed - EventPoolRestart -) - -type P int64 - -func (ev P) String() string { - switch ev { - case EventWorkerProcessExit: - return "EventWorkerProcessExit" - case EventWorkerConstruct: - return "EventWorkerConstruct" - case EventWorkerDestruct: - return "EventWorkerDestruct" - case EventSupervisorError: - return "EventSupervisorError" - case EventNoFreeWorkers: - return "EventNoFreeWorkers" - case EventMaxMemory: - return "EventMaxMemory" - case EventTTL: - return "EventTTL" - case EventIdleTTL: - return "EventIdleTTL" - case EventExecTTL: - return "EventExecTTL" - case EventPoolRestart: - return "EventPoolRestart" - } - return UnknownEventType -} - -// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. -type PoolEvent struct { - // Event type, see below. - Event P - - // Payload depends on event type, typically it's worker or error. - Payload interface{} - Error error -} diff --git a/events/types.go b/events/types.go new file mode 100644 index 00000000..09bbf7a7 --- /dev/null +++ b/events/types.go @@ -0,0 +1,196 @@ +package events + +import ( + "fmt" +) + +type EventBus interface { + SubscribeAll(subID string, ch chan<- Event) error + SubscribeP(subID string, pattern string, ch chan<- Event) error + Unsubscribe(subID string) + UnsubscribeP(subID, pattern string) + Send(ev Event) +} + +type Event interface { + fmt.Stringer + Plugin() string + Type() EventType + Message() string +} + +type RREvent struct { + // event typ + T EventType + // plugin + P string + // message + M string +} + +// NewRREvent initializes new event +func NewRREvent(t EventType, msg string, plugin string) *RREvent { + return &RREvent{ + T: t, + P: plugin, + M: msg, + } +} + +func (r *RREvent) String() string { + return "RoadRunner event" +} + +func (r *RREvent) Type() EventType { + return r.T +} + +func (r *RREvent) Message() string { + return r.M +} + +func (r *RREvent) Plugin() string { + return r.P +} + +type EventType uint32 + +const ( + // EventUnaryCallOk represents success unary call response + EventUnaryCallOk EventType = iota + + // EventUnaryCallErr raised when unary call ended with error + EventUnaryCallErr + + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipePaused when pipeline has been paused. + EventPipePaused + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventDriverReady thrown when broken is ready to accept/serve tasks. + EventDriverReady + + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventWorkerProcessExit triggered on process wait exit + EventWorkerProcessExit + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL + + // EventPoolRestart triggered when pool restart is needed + EventPoolRestart + + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog + // EventWorkerStderr is the worker standard error output + EventWorkerStderr + // EventWorkerWaitExit is the worker exit event + EventWorkerWaitExit +) + +func (et EventType) String() string { + switch et { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventDriverReady: + return "EventDriverReady" + case EventPipePaused: + return "EventPipePaused" + + case EventUnaryCallOk: + return "EventUnaryCallOk" + case EventUnaryCallErr: + return "EventUnaryCallErr" + + case EventWorkerProcessExit: + return "EventWorkerProcessExit" + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + case EventPoolRestart: + return "EventPoolRestart" + + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + case EventWorkerStderr: + return "EventWorkerStderr" + case EventWorkerWaitExit: + return "EventWorkerWaitExit" + + default: + return "UnknownEventType" + } +} diff --git a/events/wildcard.go b/events/wildcard.go new file mode 100644 index 00000000..171cbf9d --- /dev/null +++ b/events/wildcard.go @@ -0,0 +1,43 @@ +package events + +import ( + "strings" + + "github.com/spiral/errors" +) + +type wildcard struct { + prefix string + suffix string +} + +func newWildcard(pattern string) (*wildcard, error) { + // Normalize + origin := strings.ToLower(pattern) + i := strings.IndexByte(origin, '*') + + /* + http.* + * + *.WorkerError + */ + if i == -1 { + dotI := strings.IndexByte(pattern, '.') + + if dotI == -1 { + // http.SuperEvent + return nil, errors.Str("wrong wildcard, no * or .") + } + + return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil + } + + // pref: http. + // suff: * + return &wildcard{origin[0:i], origin[i+1:]}, nil +} + +func (w wildcard) match(s string) bool { + s = strings.ToLower(s) + return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix) +} diff --git a/events/wildcard_test.go b/events/wildcard_test.go new file mode 100644 index 00000000..230ef673 --- /dev/null +++ b/events/wildcard_test.go @@ -0,0 +1,48 @@ +package events + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWildcard(t *testing.T) { + w, err := newWildcard("http.*") + assert.NoError(t, err) + assert.True(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.True(t, w.match("http.****")) + + // *.* -> * + w, err = newWildcard("*") + assert.NoError(t, err) + assert.True(t, w.match("http.SuperEvent")) + assert.True(t, w.match("https.SuperEvent")) + assert.True(t, w.match("")) + assert.True(t, w.match("*")) + assert.True(t, w.match("****")) + assert.True(t, w.match("http.****")) + + w, err = newWildcard("*.WorkerError") + assert.NoError(t, err) + assert.False(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.False(t, w.match("http.****")) + assert.True(t, w.match("http.WorkerError")) + + w, err = newWildcard("http.WorkerError") + assert.NoError(t, err) + assert.False(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.False(t, w.match("http.****")) + assert.True(t, w.match("http.WorkerError")) +} diff --git a/events/worker_events.go b/events/worker_events.go deleted file mode 100644 index 6b80df61..00000000 --- a/events/worker_events.go +++ /dev/null @@ -1,40 +0,0 @@ -package events - -const ( - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError W = iota + 11000 - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog - // EventWorkerStderr is the worker standard error output - EventWorkerStderr - // EventWorkerWaitExit is the worker exit event - EventWorkerWaitExit -) - -type W int64 - -func (ev W) String() string { - switch ev { - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - case EventWorkerStderr: - return "EventWorkerStderr" - case EventWorkerWaitExit: - return "EventWorkerWaitExit" - } - return UnknownEventType -} - -// WorkerEvent wraps worker events. -type WorkerEvent struct { - // Event id, see below. - Event W - - // Worker triggered the event. - Worker interface{} - - // Event specific payload. - Payload interface{} -} diff --git a/pool/static_pool.go b/pool/static_pool.go index 91bd1c2c..27db830c 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "os/exec" "time" @@ -14,8 +15,12 @@ import ( workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" ) -// StopRequest can be sent by worker to indicate that restart is required. -const StopRequest = "{\"stop\":true}" +const ( + // StopRequest can be sent by worker to indicate that restart is required. + StopRequest = `{"stop":true}` + // pluginName ... + pluginName = "pool" +) // ErrorEncoder encode error or make a decision based on the error type type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) @@ -34,11 +39,8 @@ type StaticPool struct { // creates and connects to stack factory transport.Factory - // distributes the events - events events.Handler - - // saved list of event listeners - listeners []events.Listener + events events.EventBus + eventsID string // manages worker states and TTLs ww Watcher @@ -62,11 +64,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg cfg.MaxJobs = 1 } + eb, id := events.Bus() p := &StaticPool{ - cfg: cfg, - cmd: cmd, - factory: factory, - events: events.NewEventsHandler(), + cfg: cfg, + cmd: cmd, + factory: factory, + events: eb, + eventsID: id, } // add pool options @@ -77,7 +81,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) @@ -95,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { - sp := supervisorWrapper(p, p.events, p.cfg.Supervisor) + sp := supervisorWrapper(p, p.cfg.Supervisor) // start watcher timer sp.Start() return sp, nil @@ -104,20 +108,6 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg return p, nil } -func AddListeners(listeners ...events.Listener) Options { - return func(p *StaticPool) { - p.listeners = listeners - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - -// AddListener connects event listener to the pool. -func (sp *StaticPool) addListener(listener events.Listener) { - sp.events.AddListener(listener) -} - // GetConfig returns associated pool configuration. Immutable. func (sp *StaticPool) GetConfig() interface{} { return sp.cfg @@ -205,7 +195,11 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()), + }) } } @@ -227,7 +221,11 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Error: errors.E(op, err)}) + sp.events.Send(&events.RREvent{ + T: events.EventNoFreeWorkers, + P: pluginName, + M: fmt.Sprintf("error: %s", err), + }) return nil, errors.E(op, err) } // else if err not nil - return error @@ -238,6 +236,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work // Destroy all underlying stack (but let them complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { + sp.events.Unsubscribe(sp.eventsID) sp.ww.Destroy(ctx) } @@ -246,12 +245,20 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err}) + sp.events.Send(&events.RREvent{ + T: events.EventExecTTL, + P: pluginName, + M: fmt.Sprintf("error: %s", err), + }) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), + }) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -272,7 +279,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), + }) // kill the worker instead of sending net packet to it _ = w.Kill() @@ -280,7 +291,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return nil, err default: w.State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerDestruct, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), + }) // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { @@ -296,7 +311,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) defer cancel() - w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...) + w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd()) if err != nil { return nil, err } @@ -304,9 +319,10 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // wrap sync worker sw := worker.From(w) - sp.events.Push(events.PoolEvent{ - Event: events.EventWorkerConstruct, - Payload: sw, + sp.events.Send(&events.RREvent{ + T: events.EventWorkerConstruct, + P: pluginName, + M: fmt.Sprintf("pid: %d", sw.Pid()), }) return sw, nil } @@ -329,7 +345,11 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw.State().Set(worker.StateDestroyed) err = sw.Kill() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), + }) return nil, err } @@ -346,7 +366,11 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // redirect call to the worker with TTL r, err := sw.ExecWithTTL(ctx, p) if stopErr := sw.Stop(); stopErr != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), + }) } return r, err diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 9861f0d8..abef3779 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -18,6 +18,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var cfg = &Config{ @@ -167,26 +168,17 @@ func Test_StaticPool_JobError(t *testing.T) { func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } + + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, pipe.NewPipeFactory(), cfg, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -196,22 +188,22 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + event := <-ch + if !strings.Contains(event.Message(), "undefined_function()") { + t.Fatal("event should contain undefiled function()") + } p.Destroy(ctx) } func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() + // Run pool events - ev := make(chan struct{}, 1) - listener := func(event interface{}) { - if pe, ok := event.(events.PoolEvent); ok { - if pe.Event == events.EventWorkerConstruct { - ev <- struct{}{} - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch) + require.NoError(t, err) var cfg2 = &Config{ NumWorkers: 1, @@ -224,7 +216,6 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), cfg2, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -242,7 +233,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Equal(t, 1, len(p.Workers())) // first creation - <-ev + <-ch // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill() if err != nil { @@ -250,7 +241,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } // re-creation - <-ev + <-ch list := p.Workers() for _, w := range list { @@ -496,15 +487,11 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventNoFreeWorkers { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch) + require.NoError(t, err) p, err := Initialize( ctx, @@ -518,7 +505,6 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { DestroyTimeout: time.Second, Supervisor: nil, }, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -532,7 +518,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + <-ch p.Destroy(ctx) } diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 99af168c..c1fb6eec 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "sync" "time" @@ -12,7 +13,10 @@ import ( "github.com/spiral/roadrunner/v2/worker" ) -const MB = 1024 * 1024 +const ( + MB = 1024 * 1024 + supervisorName string = "supervisor" +) // NSEC_IN_SEC nanoseconds in second const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck @@ -24,20 +28,23 @@ type Supervised interface { } type supervised struct { - cfg *SupervisorConfig - events events.Handler - pool Pool - stopCh chan struct{} - mu *sync.RWMutex + cfg *SupervisorConfig + events events.EventBus + eventsID string + pool Pool + stopCh chan struct{} + mu *sync.RWMutex } -func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { +func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised { + eb, id := events.Bus() sp := &supervised{ - cfg: cfg, - events: events, - pool: pool, - mu: &sync.RWMutex{}, - stopCh: make(chan struct{}), + cfg: cfg, + events: eb, + eventsID: id, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), } return sp @@ -148,7 +155,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventTTL, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) continue } @@ -168,7 +179,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventMaxMemory, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) continue } @@ -223,7 +238,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double-check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventIdleTTL, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index aca379c6..c3abf85e 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -326,14 +326,10 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { }, } - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventMaxMemory { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch) + require.NoError(t, err) // constructed // max memory @@ -344,7 +340,6 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, pipe.NewPipeFactory(), cfgExecTTL, - AddListeners(listener), ) assert.NoError(t, err) @@ -359,7 +354,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.Empty(t, resp.Body) assert.Empty(t, resp.Context) - <-block + <-ch p.Destroy(context.Background()) } diff --git a/tests/worker-ok.php b/tests/psr-worker-post.php index 63558b0f..2f54af5b 100644 --- a/tests/worker-ok.php +++ b/tests/psr-worker-post.php @@ -1,14 +1,16 @@ <?php + /** * @var Goridge\RelayInterface $relay */ + use Spiral\Goridge; use Spiral\RoadRunner; ini_set('display_errors', 'stderr'); require __DIR__ . "/vendor/autoload.php"; -$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$worker = RoadRunner\Worker::create(); $psr7 = new RoadRunner\Http\PSR7Worker( $worker, new \Nyholm\Psr7\Factory\Psr17Factory(), @@ -19,7 +21,8 @@ $psr7 = new RoadRunner\Http\PSR7Worker( while ($req = $psr7->waitRequest()) { try { $resp = new \Nyholm\Psr7\Response(); - $resp->getBody()->write($_SERVER['RR_BROADCAST_PATH'] ?? ''); + $resp->getBody()->write((string) $req->getBody()); + $psr7->respond($resp); } catch (\Throwable $e) { $psr7->getWorker()->error((string)$e); diff --git a/tests/temporal-worker.php b/tests/temporal-worker.php deleted file mode 100644 index 5c9c80e6..00000000 --- a/tests/temporal-worker.php +++ /dev/null @@ -1,34 +0,0 @@ -<?php - -declare(strict_types=1); - -require __DIR__ . '/vendor/autoload.php'; - -/** - * @param string $dir - * @return array<string> - */ -$getClasses = static function (string $dir): iterable { - $files = glob($dir . '/*.php'); - - foreach ($files as $file) { - yield substr(basename($file), 0, -4); - } -}; - -$factory = \Temporal\WorkerFactory::create(); - -$worker = $factory->newWorker('default'); - -// register all workflows -foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) { - $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name); -} - -// register all activity -foreach ($getClasses(__DIR__ . '/src/Activity') as $name) { - $class = 'Temporal\\Tests\\Activity\\' . $name; - $worker->registerActivityImplementations(new $class); -} - -$factory->run(); diff --git a/tests/worker-cors.php b/tests/worker-cors.php deleted file mode 100644 index ea3c986c..00000000 --- a/tests/worker-cors.php +++ /dev/null @@ -1,15 +0,0 @@ -<?php - -use Spiral\RoadRunner\Worker; -use Spiral\RoadRunner\Http\HttpWorker; - -ini_set('display_errors', 'stderr'); -require __DIR__ . '/vendor/autoload.php'; - -$http = new HttpWorker(Worker::create()); - -while ($req = $http->waitRequest()) { - $http->respond(200, 'Response', [ - 'Access-Control-Allow-Origin' => ['*'] - ]); -} diff --git a/tests/worker-deny.php b/tests/worker-deny.php deleted file mode 100644 index 6dc993f6..00000000 --- a/tests/worker-deny.php +++ /dev/null @@ -1,30 +0,0 @@ -<?php -/** - * @var Goridge\RelayInterface $relay - */ -use Spiral\Goridge; -use Spiral\RoadRunner; - -ini_set('display_errors', 'stderr'); -require __DIR__ . "/vendor/autoload.php"; - -$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); -$psr7 = new RoadRunner\Http\PSR7Worker( - $worker, - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory() -); - -while ($req = $psr7->waitRequest()) { - try { - $resp = new \Nyholm\Psr7\Response(); - if ($req->getAttribute('ws:joinServer')) { - $psr7->respond($resp->withStatus(200)); - } else { - $psr7->respond($resp->withStatus(401)); - } - } catch (\Throwable $e) { - $psr7->getWorker()->error((string)$e); - } -} diff --git a/tests/worker-origin.php b/tests/worker-origin.php deleted file mode 100644 index 6ce4de59..00000000 --- a/tests/worker-origin.php +++ /dev/null @@ -1,14 +0,0 @@ -<?php - -use Spiral\RoadRunner\Worker; -use Spiral\RoadRunner\Http\HttpWorker; - -require __DIR__ . '/vendor/autoload.php'; - -$http = new HttpWorker(Worker::create()); - -while ($req = $http->waitRequest()) { - $http->respond(200, 'Response', [ - 'Access-Control-Allow-Origin' => ['*'] - ]); -} diff --git a/tests/worker-stop.php b/tests/worker-stop.php deleted file mode 100644 index 83fc5710..00000000 --- a/tests/worker-stop.php +++ /dev/null @@ -1,26 +0,0 @@ -<?php -/** - * @var Goridge\RelayInterface $relay - */ -use Spiral\Goridge; -use Spiral\RoadRunner; - -ini_set('display_errors', 'stderr'); -require __DIR__ . "/vendor/autoload.php"; - -$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); -$psr7 = new RoadRunner\Http\PSR7Worker( - $worker, - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory() -); - -while ($req = $psr7->waitRequest()) { - try { - $resp = new \Nyholm\Psr7\Response(); - $psr7->respond($resp->withAddedHeader('stop', 'we-dont-like-you')->withStatus(401)); - } catch (\Throwable $e) { - $psr7->getWorker()->error((string)$e); - } -} diff --git a/transport/interface.go b/transport/interface.go index e20f2b0b..0d6c8e8b 100644 --- a/transport/interface.go +++ b/transport/interface.go @@ -4,7 +4,6 @@ import ( "context" "os/exec" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/worker" ) @@ -12,10 +11,10 @@ import ( type Factory interface { // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. // Process must not be started. - SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) + SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (*worker.Process, error) // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. - SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error) + SpawnWorker(*exec.Cmd) (*worker.Process, error) // Close the factory and underlying connections. Close() error } diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go index 3ea8fd98..c70b3f65 100755 --- a/transport/pipe/pipe_factory.go +++ b/transport/pipe/pipe_factory.go @@ -5,7 +5,6 @@ import ( "os/exec" "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" ) @@ -27,10 +26,10 @@ type sr struct { // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { spCh := make(chan sr) go func() { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd) if err != nil { select { case spCh <- sr{ @@ -130,8 +129,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd) if err != nil { return nil, err } diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go index 45b7aef8..81004027 100644 --- a/transport/pipe/pipe_factory_spawn_test.go +++ b/transport/pipe/pipe_factory_spawn_test.go @@ -12,6 +12,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_GetState2(t *testing.T) { @@ -105,21 +106,20 @@ func Test_Pipe_PipeError4(t *testing.T) { func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } - w, err := NewPipeFactory().SpawnWorker(cmd, listener) + + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Pipe_Invalid2(t *testing.T) { @@ -368,17 +368,13 @@ func Test_Echo_Slow2(t *testing.T) { func Test_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - w, err := NewPipeFactory().SpawnWorker(cmd, listener) + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -390,11 +386,11 @@ func Test_Broken2(t *testing.T) { assert.Nil(t, res) time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { + + msg := <-ch + if strings.ContainsAny(msg.Message(), "undefined_function()") == false { t.Fail() } - mu.Unlock() assert.Error(t, w.Stop()) } diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index b4ba8c87..8c6d440a 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_GetState(t *testing.T) { @@ -125,22 +126,20 @@ func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") ctx := context.Background() - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Pipe_Invalid(t *testing.T) { @@ -433,17 +432,13 @@ func Test_Broken(t *testing.T) { t.Parallel() ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -455,11 +450,10 @@ func Test_Broken(t *testing.T) { assert.Nil(t, res) time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { + msg := <-ch + if strings.ContainsAny(msg.Message(), "undefined_function()") == false { t.Fail() } - mu.Unlock() assert.Error(t, w.Stop()) } diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go index dfffdf4e..06d7000d 100755 --- a/transport/socket/socket_factory.go +++ b/transport/socket/socket_factory.go @@ -12,7 +12,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/goridge/v3/pkg/socket" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" @@ -83,12 +82,12 @@ type socketSpawn struct { } // SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { c := make(chan socketSpawn) go func() { ctxT, cancel := context.WithTimeout(ctx, f.tout) defer cancel() - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd) if err != nil { select { case c <- socketSpawn{ @@ -157,8 +156,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd) if err != nil { return nil, err } diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go index 363a3510..45fb3bd5 100644 --- a/transport/socket/socket_factory_spawn_test.go +++ b/transport/socket/socket_factory_spawn_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_Tcp_Start2(t *testing.T) { @@ -110,21 +111,19 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err2) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Tcp_Invalid2(t *testing.T) { @@ -162,18 +161,12 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -198,7 +191,11 @@ func Test_Tcp_Broken2(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function() string") + } } func Test_Tcp_Echo2(t *testing.T) { @@ -273,21 +270,19 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Unix_Timeout2(t *testing.T) { @@ -331,18 +326,12 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -367,7 +356,11 @@ func Test_Unix_Broken2(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } } func Test_Unix_Echo2(t *testing.T) { diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go index d517d026..11b34999 100755 --- a/transport/socket/socket_factory_test.go +++ b/transport/socket/socket_factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_Tcp_Start(t *testing.T) { @@ -124,21 +125,19 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err2) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Tcp_Timeout(t *testing.T) { @@ -203,18 +202,12 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -239,7 +232,11 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } } func Test_Tcp_Echo(t *testing.T) { @@ -368,21 +365,19 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Unix_Timeout(t *testing.T) { @@ -444,20 +439,12 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -481,7 +468,12 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } + wg.Wait() } diff --git a/worker/worker.go b/worker/worker.go index 38a1e9ac..5973adc6 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -12,18 +12,24 @@ import ( "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/utils" "go.uber.org/multierr" ) type Options func(p *Process) +const ( + workerEventsName string = "worker" +) + // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. created time.Time // updates parent supervisor or pool about Process events - events events.Handler + events events.EventBus + eventsID string // state holds information about current Process state, // number of Process executions, buf status change time. @@ -49,11 +55,14 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } + + eb, id := events.Bus() w := &Process{ - created: time.Now(), - events: events.NewEventsHandler(), - cmd: cmd, - state: NewWorkerState(StateInactive), + created: time.Now(), + events: eb, + eventsID: id, + cmd: cmd, + state: NewWorkerState(StateInactive), } // set self as stderr implementation (Writer interface) @@ -67,14 +76,6 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { return w, nil } -func AddListeners(listeners ...events.Listener) Options { - return func(p *Process) { - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - // Pid returns worker pid. func (w *Process) Pid() int64 { return int64(w.pid) @@ -85,11 +86,6 @@ func (w *Process) Created() time.Time { return w.created } -// AddListener registers new worker event listener. -func (w *Process) addListener(listener events.Listener) { - w.events.AddListener(listener) -} - // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. func (w *Process) State() State { @@ -166,6 +162,8 @@ func (w *Process) Wait() error { return nil } + w.events.Unsubscribe(w.eventsID) + return err } @@ -187,9 +185,13 @@ func (w *Process) Stop() error { if err != nil { w.state.Set(StateKilling) _ = w.cmd.Process.Signal(os.Kill) + + w.events.Unsubscribe(w.eventsID) return errors.E(op, errors.Network, err) } + w.state.Set(StateStopped) + w.events.Unsubscribe(w.eventsID) return nil } @@ -201,6 +203,8 @@ func (w *Process) Kill() error { if err != nil { return err } + + w.events.Unsubscribe(w.eventsID) return nil } @@ -210,11 +214,18 @@ func (w *Process) Kill() error { return err } w.state.Set(StateStopped) + + w.events.Unsubscribe(w.eventsID) return nil } // Worker stderr func (w *Process) Write(p []byte) (n int, err error) { - w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) + w.events.Send(&events.RREvent{ + T: events.EventWorkerStderr, + P: workerEventsName, + M: utils.AsString(p), + }) + return len(p), nil } diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 175972e0..871e6146 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -2,6 +2,7 @@ package worker_watcher //nolint:stylecheck import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -13,6 +14,10 @@ import ( "github.com/spiral/roadrunner/v2/worker_watcher/container/channel" ) +const ( + wwName string = "worker_watcher" +) + // Vector interface represents vector container type Vector interface { // Push used to put worker to the vector @@ -34,25 +39,28 @@ type workerWatcher struct { // used to control Destroy stage (that all workers are in the container) numWorkers *uint64 - workers []worker.BaseProcess + workers []worker.BaseProcess + events events.EventBus + eventsID string allocator worker.Allocator allocateTimeout time.Duration - events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher { + eb, id := events.Bus() ww := &workerWatcher{ container: channel.NewVector(numWorkers), + events: eb, + eventsID: id, // pass a ptr to the number of workers to avoid blocking in the TTL loop numWorkers: utils.Uint64(numWorkers), allocateTimeout: allocateTimeout, workers: make([]worker.BaseProcess, 0, numWorkers), allocator: allocator, - events: events, } return ww @@ -140,11 +148,11 @@ func (ww *workerWatcher) Allocate() error { sw, err := ww.allocator() if err != nil { // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), - }) + ww.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: wwName, + M: fmt.Sprintf("can't allocate the worker: %v", err), + }) // if no timeout, return error immediately if ww.allocateTimeout == 0 { @@ -168,11 +176,11 @@ func (ww *workerWatcher) Allocate() error { sw, err = ww.allocator() if err != nil { // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), - }) + ww.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: wwName, + M: fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err), + }) continue } @@ -234,6 +242,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) { ww.container.Destroy() ww.Unlock() + ww.events.Unsubscribe(ww.eventsID) tt := time.NewTicker(time.Millisecond * 100) defer tt.Stop() for { //nolint:gosimple @@ -278,9 +287,10 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { - ww.events.Push(events.WorkerEvent{ - Event: events.EventWorkerWaitExit, - Payload: err, + ww.events.Send(&events.RREvent{ + T: events.EventWorkerWaitExit, + P: wwName, + M: fmt.Sprintf("error: %v", err), }) } @@ -289,7 +299,12 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace - ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + ww.events.Send(&events.RREvent{ + T: events.EventWorkerDestruct, + P: wwName, + M: fmt.Sprintf("pid: %d", w.Pid()), + }) + return } @@ -298,9 +313,10 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { err = ww.Allocate() if err != nil { - ww.events.Push(events.PoolEvent{ - Event: events.EventWorkerProcessExit, - Error: errors.E(op, err), + ww.events.Send(&events.RREvent{ + T: events.EventWorkerProcessExit, + P: wwName, + M: fmt.Sprintf("error: %v", err), }) // no workers at all, panic |