From 9d42e1d430c45a21b8eed86cc3d36817f7deeb64 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 26 Oct 2021 19:22:09 +0300 Subject: Events package update Signed-off-by: Valery Piashchynski --- Makefile | 2 + events/docs/events.md | 0 events/events_test.go | 22 +++ events/eventsbus.go | 129 +++++++++++++++++ events/general.go | 41 ------ events/grpc_event.go | 39 ----- events/init.go | 20 +++ events/interface.go | 14 -- events/jobs_events.go | 81 ----------- events/pool_events.go | 71 ---------- events/types.go | 196 ++++++++++++++++++++++++++ events/wildcard.go | 43 ++++++ events/wildcard_test.go | 48 +++++++ events/worker_events.go | 40 ------ pool/static_pool.go | 102 +++++++++----- pool/static_pool_test.go | 58 +++----- pool/supervisor_pool.go | 49 +++++-- pool/supervisor_test.go | 15 +- tests/psr-worker-post.php | 30 ++++ tests/temporal-worker.php | 34 ----- tests/worker-cors.php | 15 -- tests/worker-deny.php | 30 ---- tests/worker-ok.php | 27 ---- tests/worker-origin.php | 14 -- tests/worker-stop.php | 26 ---- transport/interface.go | 5 +- transport/pipe/pipe_factory.go | 9 +- transport/pipe/pipe_factory_spawn_test.go | 46 +++--- transport/pipe/pipe_factory_test.go | 44 +++--- transport/socket/socket_factory.go | 9 +- transport/socket/socket_factory_spawn_test.go | 89 ++++++------ transport/socket/socket_factory_test.go | 92 ++++++------ worker/worker.go | 49 ++++--- worker_watcher/worker_watcher.go | 58 +++++--- 34 files changed, 814 insertions(+), 733 deletions(-) create mode 100644 events/docs/events.md create mode 100644 events/events_test.go create mode 100644 events/eventsbus.go delete mode 100755 events/general.go delete mode 100644 events/grpc_event.go create mode 100644 events/init.go delete mode 100644 events/interface.go delete mode 100644 events/jobs_events.go delete mode 100644 events/pool_events.go create mode 100644 events/types.go create mode 100644 events/wildcard.go create mode 100644 events/wildcard_test.go delete mode 100644 events/worker_events.go create mode 100644 tests/psr-worker-post.php delete mode 100644 tests/temporal-worker.php delete mode 100644 tests/worker-cors.php delete mode 100644 tests/worker-deny.php delete mode 100644 tests/worker-ok.php delete mode 100644 tests/worker-origin.php delete mode 100644 tests/worker-stop.php diff --git a/Makefile b/Makefile index 9c800ea4..d2f4ebc6 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,7 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.out -covermode=atomic ./bst go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.out -covermode=atomic ./priority_queue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/events.out -covermode=atomic ./events echo 'mode: atomic' > ./coverage-ci/summary.txt tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt @@ -25,3 +26,4 @@ test: ## Run application tests go test -v -race -tags=debug ./worker_watcher go test -v -race -tags=debug ./bst go test -v -race -tags=debug ./priority_queue + go test -v -race -tags=debug ./events diff --git a/events/docs/events.md b/events/docs/events.md new file mode 100644 index 00000000..e69de29b diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 00000000..6c501392 --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,22 @@ +package events + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEvenHandler(t *testing.T) { + eh, id := Bus() + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "http.EventJobOK", ch) + require.NoError(t, err) + + eh.Send(NewRREvent(EventJobOK, "foo", "http")) + + evt := <-ch + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) +} diff --git a/events/eventsbus.go b/events/eventsbus.go new file mode 100644 index 00000000..79f5babd --- /dev/null +++ b/events/eventsbus.go @@ -0,0 +1,129 @@ +package events + +import ( + "fmt" + "sync" +) + +type sub struct { + pattern string + w *wildcard + events chan<- Event +} + +type eventsBus struct { + sync.RWMutex + subscribers sync.Map + internalEvCh chan Event + stop chan struct{} +} + +func newEventsBus() *eventsBus { + return &eventsBus{ + internalEvCh: make(chan Event, 100), + stop: make(chan struct{}), + } +} + +/* +http.* <- +*/ + +// SubscribeAll for all RR events +// returns subscriptionID +func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error { + return eb.subscribe(subID, "*", ch) +} + +// SubscribeP pattern like "pluginName.EventType" +func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error { + return eb.subscribe(subID, pattern, ch) +} + +func (eb *eventsBus) Unsubscribe(subID string) { + eb.subscribers.Delete(subID) +} +func (eb *eventsBus) UnsubscribeP(subID, pattern string) { + if sb, ok := eb.subscribers.Load(subID); ok { + eb.Lock() + defer eb.Unlock() + + sbArr := sb.([]*sub) + + for i := 0; i < len(sbArr); i++ { + if sbArr[i].pattern == pattern { + sbArr[i] = sbArr[len(sbArr)-1] + sbArr = sbArr[:len(sbArr)-1] + // replace with new array + eb.subscribers.Store(subID, sbArr) + return + } + } + } +} + +// Send sends event to the events bus +func (eb *eventsBus) Send(ev Event) { + eb.internalEvCh <- ev +} + +func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error { + eb.Lock() + defer eb.Unlock() + w, err := newWildcard(pattern) + if err != nil { + return err + } + + if sb, ok := eb.subscribers.Load(subID); ok { + // at this point we are confident that sb is a []*sub type + subArr := sb.([]*sub) + subArr = append(subArr, &sub{ + pattern: pattern, + w: w, + events: ch, + }) + + eb.subscribers.Store(subID, subArr) + + return nil + } + + subArr := make([]*sub, 0, 5) + subArr = append(subArr, &sub{ + pattern: pattern, + w: w, + events: ch, + }) + + eb.subscribers.Store(subID, subArr) + + return nil +} + +func (eb *eventsBus) handleEvents() { + for { //nolint:gosimple + select { + case ev := <-eb.internalEvCh: + // + wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String()) + + eb.subscribers.Range(func(key, value interface{}) bool { + vsub := value.([]*sub) + + for i := 0; i < len(vsub); i++ { + if vsub[i].w.match(wc) { + select { + case vsub[i].events <- ev: + return true + default: + return true + } + } + } + + return true + }) + } + } +} diff --git a/events/general.go b/events/general.go deleted file mode 100755 index 5cf13e10..00000000 --- a/events/general.go +++ /dev/null @@ -1,41 +0,0 @@ -package events - -import ( - "sync" -) - -const UnknownEventType string = "Unknown event type" - -// HandlerImpl helps to broadcast events to multiple listeners. -type HandlerImpl struct { - listeners []Listener - sync.RWMutex // all receivers should be pointers -} - -func NewEventsHandler() Handler { - return &HandlerImpl{listeners: make([]Listener, 0, 2)} -} - -// NumListeners returns number of event listeners. -func (eb *HandlerImpl) NumListeners() int { - eb.Lock() - defer eb.Unlock() - return len(eb.listeners) -} - -// AddListener registers new event listener. -func (eb *HandlerImpl) AddListener(listener Listener) { - eb.Lock() - defer eb.Unlock() - eb.listeners = append(eb.listeners, listener) -} - -// Push broadcast events across all event listeners. -func (eb *HandlerImpl) Push(e interface{}) { - // ReadLock here because we are not changing listeners - eb.RLock() - defer eb.RUnlock() - for k := range eb.listeners { - eb.listeners[k](e) - } -} diff --git a/events/grpc_event.go b/events/grpc_event.go deleted file mode 100644 index 31ff4957..00000000 --- a/events/grpc_event.go +++ /dev/null @@ -1,39 +0,0 @@ -package events - -import ( - "time" - - "google.golang.org/grpc" -) - -const ( - // EventUnaryCallOk represents success unary call response - EventUnaryCallOk G = iota + 13000 - - // EventUnaryCallErr raised when unary call ended with error - EventUnaryCallErr -) - -type G int64 - -func (ev G) String() string { - switch ev { - case EventUnaryCallOk: - return "EventUnaryCallOk" - case EventUnaryCallErr: - return "EventUnaryCallErr" - } - return UnknownEventType -} - -// JobEvent represent job event. -type GRPCEvent struct { - Event G - // Info contains unary call info. - Info *grpc.UnaryServerInfo - // Error associated with event. - Error error - // event timings - Start time.Time - Elapsed time.Duration -} diff --git a/events/init.go b/events/init.go new file mode 100644 index 00000000..25e92fc5 --- /dev/null +++ b/events/init.go @@ -0,0 +1,20 @@ +package events + +import ( + "sync" + + "github.com/google/uuid" +) + +var evBus *eventsBus +var onInit = &sync.Once{} + +func Bus() (*eventsBus, string) { + onInit.Do(func() { + evBus = newEventsBus() + go evBus.handleEvents() + }) + + // return events bus with subscriberID + return evBus, uuid.NewString() +} diff --git a/events/interface.go b/events/interface.go deleted file mode 100644 index 7d57e4d0..00000000 --- a/events/interface.go +++ /dev/null @@ -1,14 +0,0 @@ -package events - -// Handler interface -type Handler interface { - // NumListeners return number of active listeners - NumListeners() int - // AddListener adds lister to the publisher - AddListener(listener Listener) - // Push pushes event to the listeners - Push(e interface{}) -} - -// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service. -type Listener func(event interface{}) diff --git a/events/jobs_events.go b/events/jobs_events.go deleted file mode 100644 index f65ede67..00000000 --- a/events/jobs_events.go +++ /dev/null @@ -1,81 +0,0 @@ -package events - -import ( - "time" -) - -const ( - // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK J = iota + 12000 - - // EventPushError caused when job can not be registered. - EventPushError - - // EventJobStart thrown when new job received. - EventJobStart - - // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. - EventJobOK - - // EventJobError thrown on all job related errors. See JobError as context. - EventJobError - - // EventPipeActive when pipeline has started. - EventPipeActive - - // EventPipeStopped when pipeline has been stopped. - EventPipeStopped - - // EventPipePaused when pipeline has been paused. - EventPipePaused - - // EventPipeError when pipeline specific error happen. - EventPipeError - - // EventDriverReady thrown when broken is ready to accept/serve tasks. - EventDriverReady -) - -type J int64 - -func (ev J) String() string { - switch ev { - case EventPushOK: - return "EventPushOK" - case EventPushError: - return "EventPushError" - case EventJobStart: - return "EventJobStart" - case EventJobOK: - return "EventJobOK" - case EventJobError: - return "EventJobError" - case EventPipeActive: - return "EventPipeActive" - case EventPipeStopped: - return "EventPipeStopped" - case EventPipeError: - return "EventPipeError" - case EventDriverReady: - return "EventDriverReady" - case EventPipePaused: - return "EventPipePaused" - } - return UnknownEventType -} - -// JobEvent represent job event. -type JobEvent struct { - Event J - // String is job id. - ID string - // Pipeline name - Pipeline string - // Associated driver name (amqp, ephemeral, etc) - Driver string - // Error for the jobs/pipes errors - Error error - // event timings - Start time.Time - Elapsed time.Duration -} diff --git a/events/pool_events.go b/events/pool_events.go deleted file mode 100644 index eb28df6a..00000000 --- a/events/pool_events.go +++ /dev/null @@ -1,71 +0,0 @@ -package events - -const ( - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct P = iota + 10000 - - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - - // EventSupervisorError triggered when supervisor can not complete work. - EventSupervisorError - - // EventWorkerProcessExit triggered on process wait exit - EventWorkerProcessExit - - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) - EventTTL - - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL - - // EventPoolRestart triggered when pool restart is needed - EventPoolRestart -) - -type P int64 - -func (ev P) String() string { - switch ev { - case EventWorkerProcessExit: - return "EventWorkerProcessExit" - case EventWorkerConstruct: - return "EventWorkerConstruct" - case EventWorkerDestruct: - return "EventWorkerDestruct" - case EventSupervisorError: - return "EventSupervisorError" - case EventNoFreeWorkers: - return "EventNoFreeWorkers" - case EventMaxMemory: - return "EventMaxMemory" - case EventTTL: - return "EventTTL" - case EventIdleTTL: - return "EventIdleTTL" - case EventExecTTL: - return "EventExecTTL" - case EventPoolRestart: - return "EventPoolRestart" - } - return UnknownEventType -} - -// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. -type PoolEvent struct { - // Event type, see below. - Event P - - // Payload depends on event type, typically it's worker or error. - Payload interface{} - Error error -} diff --git a/events/types.go b/events/types.go new file mode 100644 index 00000000..09bbf7a7 --- /dev/null +++ b/events/types.go @@ -0,0 +1,196 @@ +package events + +import ( + "fmt" +) + +type EventBus interface { + SubscribeAll(subID string, ch chan<- Event) error + SubscribeP(subID string, pattern string, ch chan<- Event) error + Unsubscribe(subID string) + UnsubscribeP(subID, pattern string) + Send(ev Event) +} + +type Event interface { + fmt.Stringer + Plugin() string + Type() EventType + Message() string +} + +type RREvent struct { + // event typ + T EventType + // plugin + P string + // message + M string +} + +// NewRREvent initializes new event +func NewRREvent(t EventType, msg string, plugin string) *RREvent { + return &RREvent{ + T: t, + P: plugin, + M: msg, + } +} + +func (r *RREvent) String() string { + return "RoadRunner event" +} + +func (r *RREvent) Type() EventType { + return r.T +} + +func (r *RREvent) Message() string { + return r.M +} + +func (r *RREvent) Plugin() string { + return r.P +} + +type EventType uint32 + +const ( + // EventUnaryCallOk represents success unary call response + EventUnaryCallOk EventType = iota + + // EventUnaryCallErr raised when unary call ended with error + EventUnaryCallErr + + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipePaused when pipeline has been paused. + EventPipePaused + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventDriverReady thrown when broken is ready to accept/serve tasks. + EventDriverReady + + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventWorkerProcessExit triggered on process wait exit + EventWorkerProcessExit + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL + + // EventPoolRestart triggered when pool restart is needed + EventPoolRestart + + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog + // EventWorkerStderr is the worker standard error output + EventWorkerStderr + // EventWorkerWaitExit is the worker exit event + EventWorkerWaitExit +) + +func (et EventType) String() string { + switch et { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventDriverReady: + return "EventDriverReady" + case EventPipePaused: + return "EventPipePaused" + + case EventUnaryCallOk: + return "EventUnaryCallOk" + case EventUnaryCallErr: + return "EventUnaryCallErr" + + case EventWorkerProcessExit: + return "EventWorkerProcessExit" + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + case EventPoolRestart: + return "EventPoolRestart" + + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + case EventWorkerStderr: + return "EventWorkerStderr" + case EventWorkerWaitExit: + return "EventWorkerWaitExit" + + default: + return "UnknownEventType" + } +} diff --git a/events/wildcard.go b/events/wildcard.go new file mode 100644 index 00000000..171cbf9d --- /dev/null +++ b/events/wildcard.go @@ -0,0 +1,43 @@ +package events + +import ( + "strings" + + "github.com/spiral/errors" +) + +type wildcard struct { + prefix string + suffix string +} + +func newWildcard(pattern string) (*wildcard, error) { + // Normalize + origin := strings.ToLower(pattern) + i := strings.IndexByte(origin, '*') + + /* + http.* + * + *.WorkerError + */ + if i == -1 { + dotI := strings.IndexByte(pattern, '.') + + if dotI == -1 { + // http.SuperEvent + return nil, errors.Str("wrong wildcard, no * or .") + } + + return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil + } + + // pref: http. + // suff: * + return &wildcard{origin[0:i], origin[i+1:]}, nil +} + +func (w wildcard) match(s string) bool { + s = strings.ToLower(s) + return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix) +} diff --git a/events/wildcard_test.go b/events/wildcard_test.go new file mode 100644 index 00000000..230ef673 --- /dev/null +++ b/events/wildcard_test.go @@ -0,0 +1,48 @@ +package events + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWildcard(t *testing.T) { + w, err := newWildcard("http.*") + assert.NoError(t, err) + assert.True(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.True(t, w.match("http.****")) + + // *.* -> * + w, err = newWildcard("*") + assert.NoError(t, err) + assert.True(t, w.match("http.SuperEvent")) + assert.True(t, w.match("https.SuperEvent")) + assert.True(t, w.match("")) + assert.True(t, w.match("*")) + assert.True(t, w.match("****")) + assert.True(t, w.match("http.****")) + + w, err = newWildcard("*.WorkerError") + assert.NoError(t, err) + assert.False(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.False(t, w.match("http.****")) + assert.True(t, w.match("http.WorkerError")) + + w, err = newWildcard("http.WorkerError") + assert.NoError(t, err) + assert.False(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.False(t, w.match("http.****")) + assert.True(t, w.match("http.WorkerError")) +} diff --git a/events/worker_events.go b/events/worker_events.go deleted file mode 100644 index 6b80df61..00000000 --- a/events/worker_events.go +++ /dev/null @@ -1,40 +0,0 @@ -package events - -const ( - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError W = iota + 11000 - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog - // EventWorkerStderr is the worker standard error output - EventWorkerStderr - // EventWorkerWaitExit is the worker exit event - EventWorkerWaitExit -) - -type W int64 - -func (ev W) String() string { - switch ev { - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - case EventWorkerStderr: - return "EventWorkerStderr" - case EventWorkerWaitExit: - return "EventWorkerWaitExit" - } - return UnknownEventType -} - -// WorkerEvent wraps worker events. -type WorkerEvent struct { - // Event id, see below. - Event W - - // Worker triggered the event. - Worker interface{} - - // Event specific payload. - Payload interface{} -} diff --git a/pool/static_pool.go b/pool/static_pool.go index 91bd1c2c..27db830c 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "os/exec" "time" @@ -14,8 +15,12 @@ import ( workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" ) -// StopRequest can be sent by worker to indicate that restart is required. -const StopRequest = "{\"stop\":true}" +const ( + // StopRequest can be sent by worker to indicate that restart is required. + StopRequest = `{"stop":true}` + // pluginName ... + pluginName = "pool" +) // ErrorEncoder encode error or make a decision based on the error type type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) @@ -34,11 +39,8 @@ type StaticPool struct { // creates and connects to stack factory transport.Factory - // distributes the events - events events.Handler - - // saved list of event listeners - listeners []events.Listener + events events.EventBus + eventsID string // manages worker states and TTLs ww Watcher @@ -62,11 +64,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg cfg.MaxJobs = 1 } + eb, id := events.Bus() p := &StaticPool{ - cfg: cfg, - cmd: cmd, - factory: factory, - events: events.NewEventsHandler(), + cfg: cfg, + cmd: cmd, + factory: factory, + events: eb, + eventsID: id, } // add pool options @@ -77,7 +81,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) @@ -95,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { - sp := supervisorWrapper(p, p.events, p.cfg.Supervisor) + sp := supervisorWrapper(p, p.cfg.Supervisor) // start watcher timer sp.Start() return sp, nil @@ -104,20 +108,6 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg return p, nil } -func AddListeners(listeners ...events.Listener) Options { - return func(p *StaticPool) { - p.listeners = listeners - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - -// AddListener connects event listener to the pool. -func (sp *StaticPool) addListener(listener events.Listener) { - sp.events.AddListener(listener) -} - // GetConfig returns associated pool configuration. Immutable. func (sp *StaticPool) GetConfig() interface{} { return sp.cfg @@ -205,7 +195,11 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()), + }) } } @@ -227,7 +221,11 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Error: errors.E(op, err)}) + sp.events.Send(&events.RREvent{ + T: events.EventNoFreeWorkers, + P: pluginName, + M: fmt.Sprintf("error: %s", err), + }) return nil, errors.E(op, err) } // else if err not nil - return error @@ -238,6 +236,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work // Destroy all underlying stack (but let them complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { + sp.events.Unsubscribe(sp.eventsID) sp.ww.Destroy(ctx) } @@ -246,12 +245,20 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err}) + sp.events.Send(&events.RREvent{ + T: events.EventExecTTL, + P: pluginName, + M: fmt.Sprintf("error: %s", err), + }) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), + }) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -272,7 +279,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), + }) // kill the worker instead of sending net packet to it _ = w.Kill() @@ -280,7 +291,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return nil, err default: w.State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerDestruct, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), + }) // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { @@ -296,7 +311,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) defer cancel() - w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...) + w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd()) if err != nil { return nil, err } @@ -304,9 +319,10 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // wrap sync worker sw := worker.From(w) - sp.events.Push(events.PoolEvent{ - Event: events.EventWorkerConstruct, - Payload: sw, + sp.events.Send(&events.RREvent{ + T: events.EventWorkerConstruct, + P: pluginName, + M: fmt.Sprintf("pid: %d", sw.Pid()), }) return sw, nil } @@ -329,7 +345,11 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw.State().Set(worker.StateDestroyed) err = sw.Kill() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), + }) return nil, err } @@ -346,7 +366,11 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // redirect call to the worker with TTL r, err := sw.ExecWithTTL(ctx, p) if stopErr := sw.Stop(); stopErr != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), + }) } return r, err diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 9861f0d8..abef3779 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -18,6 +18,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var cfg = &Config{ @@ -167,26 +168,17 @@ func Test_StaticPool_JobError(t *testing.T) { func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } + + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, pipe.NewPipeFactory(), cfg, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -196,22 +188,22 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + event := <-ch + if !strings.Contains(event.Message(), "undefined_function()") { + t.Fatal("event should contain undefiled function()") + } p.Destroy(ctx) } func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() + // Run pool events - ev := make(chan struct{}, 1) - listener := func(event interface{}) { - if pe, ok := event.(events.PoolEvent); ok { - if pe.Event == events.EventWorkerConstruct { - ev <- struct{}{} - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch) + require.NoError(t, err) var cfg2 = &Config{ NumWorkers: 1, @@ -224,7 +216,6 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), cfg2, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -242,7 +233,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Equal(t, 1, len(p.Workers())) // first creation - <-ev + <-ch // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill() if err != nil { @@ -250,7 +241,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } // re-creation - <-ev + <-ch list := p.Workers() for _, w := range list { @@ -496,15 +487,11 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventNoFreeWorkers { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch) + require.NoError(t, err) p, err := Initialize( ctx, @@ -518,7 +505,6 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { DestroyTimeout: time.Second, Supervisor: nil, }, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -532,7 +518,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + <-ch p.Destroy(ctx) } diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 99af168c..c1fb6eec 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "sync" "time" @@ -12,7 +13,10 @@ import ( "github.com/spiral/roadrunner/v2/worker" ) -const MB = 1024 * 1024 +const ( + MB = 1024 * 1024 + supervisorName string = "supervisor" +) // NSEC_IN_SEC nanoseconds in second const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck @@ -24,20 +28,23 @@ type Supervised interface { } type supervised struct { - cfg *SupervisorConfig - events events.Handler - pool Pool - stopCh chan struct{} - mu *sync.RWMutex + cfg *SupervisorConfig + events events.EventBus + eventsID string + pool Pool + stopCh chan struct{} + mu *sync.RWMutex } -func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { +func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised { + eb, id := events.Bus() sp := &supervised{ - cfg: cfg, - events: events, - pool: pool, - mu: &sync.RWMutex{}, - stopCh: make(chan struct{}), + cfg: cfg, + events: eb, + eventsID: id, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), } return sp @@ -148,7 +155,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventTTL, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) continue } @@ -168,7 +179,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventMaxMemory, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) continue } @@ -223,7 +238,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double-check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventIdleTTL, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index aca379c6..c3abf85e 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -326,14 +326,10 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { }, } - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventMaxMemory { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch) + require.NoError(t, err) // constructed // max memory @@ -344,7 +340,6 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, pipe.NewPipeFactory(), cfgExecTTL, - AddListeners(listener), ) assert.NoError(t, err) @@ -359,7 +354,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.Empty(t, resp.Body) assert.Empty(t, resp.Context) - <-block + <-ch p.Destroy(context.Background()) } diff --git a/tests/psr-worker-post.php b/tests/psr-worker-post.php new file mode 100644 index 00000000..2f54af5b --- /dev/null +++ b/tests/psr-worker-post.php @@ -0,0 +1,30 @@ +waitRequest()) { + try { + $resp = new \Nyholm\Psr7\Response(); + $resp->getBody()->write((string) $req->getBody()); + + $psr7->respond($resp); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +} diff --git a/tests/temporal-worker.php b/tests/temporal-worker.php deleted file mode 100644 index 5c9c80e6..00000000 --- a/tests/temporal-worker.php +++ /dev/null @@ -1,34 +0,0 @@ - - */ -$getClasses = static function (string $dir): iterable { - $files = glob($dir . '/*.php'); - - foreach ($files as $file) { - yield substr(basename($file), 0, -4); - } -}; - -$factory = \Temporal\WorkerFactory::create(); - -$worker = $factory->newWorker('default'); - -// register all workflows -foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) { - $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name); -} - -// register all activity -foreach ($getClasses(__DIR__ . '/src/Activity') as $name) { - $class = 'Temporal\\Tests\\Activity\\' . $name; - $worker->registerActivityImplementations(new $class); -} - -$factory->run(); diff --git a/tests/worker-cors.php b/tests/worker-cors.php deleted file mode 100644 index ea3c986c..00000000 --- a/tests/worker-cors.php +++ /dev/null @@ -1,15 +0,0 @@ -waitRequest()) { - $http->respond(200, 'Response', [ - 'Access-Control-Allow-Origin' => ['*'] - ]); -} diff --git a/tests/worker-deny.php b/tests/worker-deny.php deleted file mode 100644 index 6dc993f6..00000000 --- a/tests/worker-deny.php +++ /dev/null @@ -1,30 +0,0 @@ -waitRequest()) { - try { - $resp = new \Nyholm\Psr7\Response(); - if ($req->getAttribute('ws:joinServer')) { - $psr7->respond($resp->withStatus(200)); - } else { - $psr7->respond($resp->withStatus(401)); - } - } catch (\Throwable $e) { - $psr7->getWorker()->error((string)$e); - } -} diff --git a/tests/worker-ok.php b/tests/worker-ok.php deleted file mode 100644 index 63558b0f..00000000 --- a/tests/worker-ok.php +++ /dev/null @@ -1,27 +0,0 @@ -waitRequest()) { - try { - $resp = new \Nyholm\Psr7\Response(); - $resp->getBody()->write($_SERVER['RR_BROADCAST_PATH'] ?? ''); - $psr7->respond($resp); - } catch (\Throwable $e) { - $psr7->getWorker()->error((string)$e); - } -} diff --git a/tests/worker-origin.php b/tests/worker-origin.php deleted file mode 100644 index 6ce4de59..00000000 --- a/tests/worker-origin.php +++ /dev/null @@ -1,14 +0,0 @@ -waitRequest()) { - $http->respond(200, 'Response', [ - 'Access-Control-Allow-Origin' => ['*'] - ]); -} diff --git a/tests/worker-stop.php b/tests/worker-stop.php deleted file mode 100644 index 83fc5710..00000000 --- a/tests/worker-stop.php +++ /dev/null @@ -1,26 +0,0 @@ -waitRequest()) { - try { - $resp = new \Nyholm\Psr7\Response(); - $psr7->respond($resp->withAddedHeader('stop', 'we-dont-like-you')->withStatus(401)); - } catch (\Throwable $e) { - $psr7->getWorker()->error((string)$e); - } -} diff --git a/transport/interface.go b/transport/interface.go index e20f2b0b..0d6c8e8b 100644 --- a/transport/interface.go +++ b/transport/interface.go @@ -4,7 +4,6 @@ import ( "context" "os/exec" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/worker" ) @@ -12,10 +11,10 @@ import ( type Factory interface { // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. // Process must not be started. - SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) + SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (*worker.Process, error) // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. - SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error) + SpawnWorker(*exec.Cmd) (*worker.Process, error) // Close the factory and underlying connections. Close() error } diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go index 3ea8fd98..c70b3f65 100755 --- a/transport/pipe/pipe_factory.go +++ b/transport/pipe/pipe_factory.go @@ -5,7 +5,6 @@ import ( "os/exec" "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" ) @@ -27,10 +26,10 @@ type sr struct { // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { spCh := make(chan sr) go func() { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd) if err != nil { select { case spCh <- sr{ @@ -130,8 +129,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd) if err != nil { return nil, err } diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go index 45b7aef8..81004027 100644 --- a/transport/pipe/pipe_factory_spawn_test.go +++ b/transport/pipe/pipe_factory_spawn_test.go @@ -12,6 +12,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_GetState2(t *testing.T) { @@ -105,21 +106,20 @@ func Test_Pipe_PipeError4(t *testing.T) { func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } - w, err := NewPipeFactory().SpawnWorker(cmd, listener) + + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Pipe_Invalid2(t *testing.T) { @@ -368,17 +368,13 @@ func Test_Echo_Slow2(t *testing.T) { func Test_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - w, err := NewPipeFactory().SpawnWorker(cmd, listener) + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -390,11 +386,11 @@ func Test_Broken2(t *testing.T) { assert.Nil(t, res) time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { + + msg := <-ch + if strings.ContainsAny(msg.Message(), "undefined_function()") == false { t.Fail() } - mu.Unlock() assert.Error(t, w.Stop()) } diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index b4ba8c87..8c6d440a 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_GetState(t *testing.T) { @@ -125,22 +126,20 @@ func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") ctx := context.Background() - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Pipe_Invalid(t *testing.T) { @@ -433,17 +432,13 @@ func Test_Broken(t *testing.T) { t.Parallel() ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -455,11 +450,10 @@ func Test_Broken(t *testing.T) { assert.Nil(t, res) time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { + msg := <-ch + if strings.ContainsAny(msg.Message(), "undefined_function()") == false { t.Fail() } - mu.Unlock() assert.Error(t, w.Stop()) } diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go index dfffdf4e..06d7000d 100755 --- a/transport/socket/socket_factory.go +++ b/transport/socket/socket_factory.go @@ -12,7 +12,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/goridge/v3/pkg/socket" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" @@ -83,12 +82,12 @@ type socketSpawn struct { } // SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { c := make(chan socketSpawn) go func() { ctxT, cancel := context.WithTimeout(ctx, f.tout) defer cancel() - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd) if err != nil { select { case c <- socketSpawn{ @@ -157,8 +156,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd) if err != nil { return nil, err } diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go index 363a3510..45fb3bd5 100644 --- a/transport/socket/socket_factory_spawn_test.go +++ b/transport/socket/socket_factory_spawn_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_Tcp_Start2(t *testing.T) { @@ -110,21 +111,19 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err2) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Tcp_Invalid2(t *testing.T) { @@ -162,18 +161,12 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -198,7 +191,11 @@ func Test_Tcp_Broken2(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function() string") + } } func Test_Tcp_Echo2(t *testing.T) { @@ -273,21 +270,19 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Unix_Timeout2(t *testing.T) { @@ -331,18 +326,12 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -367,7 +356,11 @@ func Test_Unix_Broken2(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } } func Test_Unix_Echo2(t *testing.T) { diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go index d517d026..11b34999 100755 --- a/transport/socket/socket_factory_test.go +++ b/transport/socket/socket_factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_Tcp_Start(t *testing.T) { @@ -124,21 +125,19 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err2) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Tcp_Timeout(t *testing.T) { @@ -203,18 +202,12 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -239,7 +232,11 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } } func Test_Tcp_Echo(t *testing.T) { @@ -368,21 +365,19 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Unix_Timeout(t *testing.T) { @@ -444,20 +439,12 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -481,7 +468,12 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } + wg.Wait() } diff --git a/worker/worker.go b/worker/worker.go index 38a1e9ac..5973adc6 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -12,18 +12,24 @@ import ( "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/utils" "go.uber.org/multierr" ) type Options func(p *Process) +const ( + workerEventsName string = "worker" +) + // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. created time.Time // updates parent supervisor or pool about Process events - events events.Handler + events events.EventBus + eventsID string // state holds information about current Process state, // number of Process executions, buf status change time. @@ -49,11 +55,14 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } + + eb, id := events.Bus() w := &Process{ - created: time.Now(), - events: events.NewEventsHandler(), - cmd: cmd, - state: NewWorkerState(StateInactive), + created: time.Now(), + events: eb, + eventsID: id, + cmd: cmd, + state: NewWorkerState(StateInactive), } // set self as stderr implementation (Writer interface) @@ -67,14 +76,6 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { return w, nil } -func AddListeners(listeners ...events.Listener) Options { - return func(p *Process) { - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - // Pid returns worker pid. func (w *Process) Pid() int64 { return int64(w.pid) @@ -85,11 +86,6 @@ func (w *Process) Created() time.Time { return w.created } -// AddListener registers new worker event listener. -func (w *Process) addListener(listener events.Listener) { - w.events.AddListener(listener) -} - // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. func (w *Process) State() State { @@ -166,6 +162,8 @@ func (w *Process) Wait() error { return nil } + w.events.Unsubscribe(w.eventsID) + return err } @@ -187,9 +185,13 @@ func (w *Process) Stop() error { if err != nil { w.state.Set(StateKilling) _ = w.cmd.Process.Signal(os.Kill) + + w.events.Unsubscribe(w.eventsID) return errors.E(op, errors.Network, err) } + w.state.Set(StateStopped) + w.events.Unsubscribe(w.eventsID) return nil } @@ -201,6 +203,8 @@ func (w *Process) Kill() error { if err != nil { return err } + + w.events.Unsubscribe(w.eventsID) return nil } @@ -210,11 +214,18 @@ func (w *Process) Kill() error { return err } w.state.Set(StateStopped) + + w.events.Unsubscribe(w.eventsID) return nil } // Worker stderr func (w *Process) Write(p []byte) (n int, err error) { - w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) + w.events.Send(&events.RREvent{ + T: events.EventWorkerStderr, + P: workerEventsName, + M: utils.AsString(p), + }) + return len(p), nil } diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 175972e0..871e6146 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -2,6 +2,7 @@ package worker_watcher //nolint:stylecheck import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -13,6 +14,10 @@ import ( "github.com/spiral/roadrunner/v2/worker_watcher/container/channel" ) +const ( + wwName string = "worker_watcher" +) + // Vector interface represents vector container type Vector interface { // Push used to put worker to the vector @@ -34,25 +39,28 @@ type workerWatcher struct { // used to control Destroy stage (that all workers are in the container) numWorkers *uint64 - workers []worker.BaseProcess + workers []worker.BaseProcess + events events.EventBus + eventsID string allocator worker.Allocator allocateTimeout time.Duration - events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher { + eb, id := events.Bus() ww := &workerWatcher{ container: channel.NewVector(numWorkers), + events: eb, + eventsID: id, // pass a ptr to the number of workers to avoid blocking in the TTL loop numWorkers: utils.Uint64(numWorkers), allocateTimeout: allocateTimeout, workers: make([]worker.BaseProcess, 0, numWorkers), allocator: allocator, - events: events, } return ww @@ -140,11 +148,11 @@ func (ww *workerWatcher) Allocate() error { sw, err := ww.allocator() if err != nil { // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), - }) + ww.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: wwName, + M: fmt.Sprintf("can't allocate the worker: %v", err), + }) // if no timeout, return error immediately if ww.allocateTimeout == 0 { @@ -168,11 +176,11 @@ func (ww *workerWatcher) Allocate() error { sw, err = ww.allocator() if err != nil { // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), - }) + ww.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: wwName, + M: fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err), + }) continue } @@ -234,6 +242,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) { ww.container.Destroy() ww.Unlock() + ww.events.Unsubscribe(ww.eventsID) tt := time.NewTicker(time.Millisecond * 100) defer tt.Stop() for { //nolint:gosimple @@ -278,9 +287,10 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { - ww.events.Push(events.WorkerEvent{ - Event: events.EventWorkerWaitExit, - Payload: err, + ww.events.Send(&events.RREvent{ + T: events.EventWorkerWaitExit, + P: wwName, + M: fmt.Sprintf("error: %v", err), }) } @@ -289,7 +299,12 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace - ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + ww.events.Send(&events.RREvent{ + T: events.EventWorkerDestruct, + P: wwName, + M: fmt.Sprintf("pid: %d", w.Pid()), + }) + return } @@ -298,9 +313,10 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { err = ww.Allocate() if err != nil { - ww.events.Push(events.PoolEvent{ - Event: events.EventWorkerProcessExit, - Error: errors.E(op, err), + ww.events.Send(&events.RREvent{ + T: events.EventWorkerProcessExit, + P: wwName, + M: fmt.Sprintf("error: %v", err), }) // no workers at all, panic -- cgit v1.2.3 From 82ee0c5a87141f31e4e28089c1a269f70066284b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 27 Oct 2021 13:10:49 +0300 Subject: Add more tests, add Len() method Signed-off-by: Valery Piashchynski --- events/events_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++ events/eventsbus.go | 43 ++++++++++++++++++++++++++++++++++- events/types.go | 11 ++------- events/wildcard.go | 2 +- 4 files changed, 108 insertions(+), 11 deletions(-) diff --git a/events/events_test.go b/events/events_test.go index 6c501392..41944b48 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -20,3 +20,66 @@ func TestEvenHandler(t *testing.T) { require.Equal(t, "http", evt.Plugin()) require.Equal(t, "EventJobOK", evt.Type().String()) } + +func TestEvenHandler2(t *testing.T) { + eh, id := Bus() + eh2, id2 := Bus() + + ch := make(chan Event, 100) + ch2 := make(chan Event, 100) + err := eh2.SubscribeP(id2, "http.EventJobOK", ch) + require.NoError(t, err) + + err = eh.SubscribeP(id, "http.EventJobOK", ch2) + require.NoError(t, err) + + eh.Send(NewRREvent(EventJobOK, "foo", "http")) + + evt := <-ch2 + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) + + l := eh.Len() + require.Equal(t, uint(2), l) + + eh.Unsubscribe(id) + + l = eh.Len() + require.Equal(t, uint(1), l) + + eh2.Unsubscribe(id2) + + l = eh.Len() + require.Equal(t, uint(0), l) +} + +func TestEvenHandler3(t *testing.T) { + eh, id := Bus() + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "EventJobOK", ch) + require.Error(t, err) +} + +func TestEvenHandler4(t *testing.T) { + eh, id := Bus() + + err := eh.SubscribeP(id, "EventJobOK", nil) + require.Error(t, err) +} + +func TestEvenHandler5(t *testing.T) { + eh, id := Bus() + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "http.EventJobOK", ch) + require.NoError(t, err) + + eh.Send(NewRREvent(EventJobOK, "foo", "http")) + + evt := <-ch + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) +} diff --git a/events/eventsbus.go b/events/eventsbus.go index 79f5babd..cd0dca71 100644 --- a/events/eventsbus.go +++ b/events/eventsbus.go @@ -2,7 +2,10 @@ package events import ( "fmt" + "strings" "sync" + + "github.com/spiral/errors" ) type sub struct { @@ -32,17 +35,39 @@ http.* <- // SubscribeAll for all RR events // returns subscriptionID func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + + if subIDTr == "" { + return errors.Str("subscriberID can't be empty") + } + return eb.subscribe(subID, "*", ch) } // SubscribeP pattern like "pluginName.EventType" func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + patternTr := strings.Trim(pattern, " ") + + if subIDTr == "" || patternTr == "" { + return errors.Str("subscriberID or pattern can't be empty") + } + return eb.subscribe(subID, pattern, ch) } func (eb *eventsBus) Unsubscribe(subID string) { eb.subscribers.Delete(subID) } + func (eb *eventsBus) UnsubscribeP(subID, pattern string) { if sb, ok := eb.subscribers.Load(subID); ok { eb.Lock() @@ -64,9 +89,25 @@ func (eb *eventsBus) UnsubscribeP(subID, pattern string) { // Send sends event to the events bus func (eb *eventsBus) Send(ev Event) { + // do not accept nil events + if ev == nil { + return + } + eb.internalEvCh <- ev } +func (eb *eventsBus) Len() uint { + var ln uint + + eb.subscribers.Range(func(key, value interface{}) bool { + ln++ + return true + }) + + return ln +} + func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error { eb.Lock() defer eb.Unlock() @@ -105,7 +146,7 @@ func (eb *eventsBus) handleEvents() { for { //nolint:gosimple select { case ev := <-eb.internalEvCh: - // + // http.WorkerError for example wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String()) eb.subscribers.Range(func(key, value interface{}) bool { diff --git a/events/types.go b/events/types.go index 09bbf7a7..d8e40084 100644 --- a/events/types.go +++ b/events/types.go @@ -1,19 +1,15 @@ package events -import ( - "fmt" -) - type EventBus interface { SubscribeAll(subID string, ch chan<- Event) error SubscribeP(subID string, pattern string, ch chan<- Event) error Unsubscribe(subID string) UnsubscribeP(subID, pattern string) + Len() uint Send(ev Event) } type Event interface { - fmt.Stringer Plugin() string Type() EventType Message() string @@ -30,6 +26,7 @@ type RREvent struct { // NewRREvent initializes new event func NewRREvent(t EventType, msg string, plugin string) *RREvent { + // get return &RREvent{ T: t, P: plugin, @@ -37,10 +34,6 @@ func NewRREvent(t EventType, msg string, plugin string) *RREvent { } } -func (r *RREvent) String() string { - return "RoadRunner event" -} - func (r *RREvent) Type() EventType { return r.T } diff --git a/events/wildcard.go b/events/wildcard.go index 171cbf9d..b4c28ae1 100644 --- a/events/wildcard.go +++ b/events/wildcard.go @@ -26,7 +26,7 @@ func newWildcard(pattern string) (*wildcard, error) { if dotI == -1 { // http.SuperEvent - return nil, errors.Str("wrong wildcard, no * or .") + return nil, errors.Str("wrong wildcard, no * or . Usage: http.Event or *.Event or http.*") } return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil -- cgit v1.2.3 From a244937265a14e10c0217d42d2aa893d3ab1535c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 27 Oct 2021 13:45:30 +0300 Subject: update tests Signed-off-by: Valery Piashchynski --- events/events_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/events/events_test.go b/events/events_test.go index 41944b48..afe64d12 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -2,6 +2,7 @@ package events import ( "testing" + "time" "github.com/stretchr/testify/require" ) @@ -44,11 +45,13 @@ func TestEvenHandler2(t *testing.T) { require.Equal(t, uint(2), l) eh.Unsubscribe(id) + time.Sleep(time.Second) l = eh.Len() require.Equal(t, uint(1), l) eh2.Unsubscribe(id2) + time.Sleep(time.Second) l = eh.Len() require.Equal(t, uint(0), l) -- cgit v1.2.3 From e9fe3e11da095ca5569ca59739c97c862fe7fade Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 27 Oct 2021 14:41:57 +0300 Subject: unsubscribe from the bus in the tests Signed-off-by: Valery Piashchynski --- events/events_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/events/events_test.go b/events/events_test.go index afe64d12..6d6e90a2 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -9,6 +9,7 @@ import ( func TestEvenHandler(t *testing.T) { eh, id := Bus() + defer eh.Unsubscribe(id) ch := make(chan Event, 100) err := eh.SubscribeP(id, "http.EventJobOK", ch) @@ -25,6 +26,8 @@ func TestEvenHandler(t *testing.T) { func TestEvenHandler2(t *testing.T) { eh, id := Bus() eh2, id2 := Bus() + defer eh.Unsubscribe(id) + defer eh2.Unsubscribe(id2) ch := make(chan Event, 100) ch2 := make(chan Event, 100) @@ -59,6 +62,7 @@ func TestEvenHandler2(t *testing.T) { func TestEvenHandler3(t *testing.T) { eh, id := Bus() + defer eh.Unsubscribe(id) ch := make(chan Event, 100) err := eh.SubscribeP(id, "EventJobOK", ch) @@ -67,6 +71,7 @@ func TestEvenHandler3(t *testing.T) { func TestEvenHandler4(t *testing.T) { eh, id := Bus() + defer eh.Unsubscribe(id) err := eh.SubscribeP(id, "EventJobOK", nil) require.Error(t, err) @@ -74,6 +79,7 @@ func TestEvenHandler4(t *testing.T) { func TestEvenHandler5(t *testing.T) { eh, id := Bus() + defer eh.Unsubscribe(id) ch := make(chan Event, 100) err := eh.SubscribeP(id, "http.EventJobOK", ch) -- cgit v1.2.3 From 304003ba268681cea7db351c8005670e72f6c7dd Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 27 Oct 2021 17:07:40 +0300 Subject: update CHANGELOG Signed-off-by: Valery Piashchynski --- CHANGELOG.md | 343 +++++++++++++++++++++++++++++++---------------------------- 1 file changed, 183 insertions(+), 160 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 380f4874..39f3f0e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,9 +12,9 @@ ```yaml broadcast: - default: - driver: memory - interval: 1 + default: + driver: memory + interval: 1 ``` ### New style: @@ -23,7 +23,7 @@ broadcast: broadcast: default: driver: memory - config: {} <--------------- NEW + config: { } <--------------- NEW ``` ```yaml @@ -37,8 +37,8 @@ kv: memcached-rr: driver: memcached config: <--------------- NEW - addr: - - "127.0.0.1:11211" + addr: + - "127.0.0.1:11211" broadcast: default: @@ -51,8 +51,11 @@ broadcast: ## ๐Ÿ‘€ New: - โœ๏ธ **[BETA]** GRPC plugin update to v2. -- โœ๏ธ [Roadrunner-plugins](https://github.com/spiral/roadrunner-plugins) repository. This is the new home for the roadrunner plugins with documentation, configuration samples, and common problems. -- โœ๏ธ **[BETA]** Let's Encrypt support. RR now can obtain an SSL certificate/PK for your domain automatically. Here is the new configuration: +- โœ๏ธ [Roadrunner-plugins](https://github.com/spiral/roadrunner-plugins) repository. This is the new home for the + roadrunner plugins with documentation, configuration samples, and common problems. +- โœ๏ธ **[BETA]** Let's Encrypt support. RR now can obtain an SSL certificate/PK for your domain automatically. Here is + the new configuration: + ```yaml ssl: # Host and port to listen on (eg.: `127.0.0.1:443`). @@ -105,23 +108,25 @@ broadcast: - โœ๏ธ Add a new option to the `logs` plugin to configure the line ending. By default, used `\n`. **New option**: + ```yaml # Logs plugin settings logs: - (....) - # Line ending - # - # Default: "\n". - line_ending: "\n" + (....) + # Line ending + # + # Default: "\n". + line_ending: "\n" ``` - โœ๏ธ HTTP [Access log support](https://github.com/spiral/roadrunner-plugins/issues/34) at the `Info` log level. + ```yaml http: address: 127.0.0.1:55555 max_request_size: 1024 access_logs: true <-------- Access Logs ON/OFF - middleware: [] + middleware: [ ] pool: num_workers: 2 @@ -129,13 +134,16 @@ http: allocate_timeout: 60s destroy_timeout: 60s ``` -- โœ๏ธ HTTP middleware to handle `X-Sendfile` [header](https://github.com/spiral/roadrunner-plugins/issues/9). - Middleware reads the file in 10MB chunks. So, for example for the 5Gb file, only 10MB of RSS will be used. If the file size is smaller than 10MB, the middleware fits the buffer to the file size. + +- โœ๏ธ HTTP middleware to handle `X-Sendfile` [header](https://github.com/spiral/roadrunner-plugins/issues/9). Middleware + reads the file in 10MB chunks. So, for example for the 5Gb file, only 10MB of RSS will be used. If the file size is + smaller than 10MB, the middleware fits the buffer to the file size. + ```yaml http: address: 127.0.0.1:44444 max_request_size: 1024 - middleware: ["sendfile"] <----- NEW MIDDLEWARE + middleware: [ "sendfile" ] <----- NEW MIDDLEWARE pool: num_workers: 2 @@ -145,6 +153,7 @@ http: ``` - โœ๏ธ Service plugin now supports env variables passing to the script/executable/binary/any like in the `server` plugin: + ```yaml service: some_service_1: @@ -152,21 +161,25 @@ service: process_num: 1 exec_timeout: 5s # s,m,h (seconds, minutes, hours) remain_after_exit: true - env: <----------------- NEW + env: <----------------- NEW foo: "BAR" restart_sec: 1 ``` - โœ๏ธ Server plugin can accept scripts (sh, bash, etc) in it's `command` configuration key: + ```yaml server: - command: "./script.sh OR sh script.sh" <--- UPDATED - relay: "pipes" - relay_timeout: "20s" + command: "./script.sh OR sh script.sh" <--- UPDATED + relay: "pipes" + relay_timeout: "20s" ``` -The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can close `stdin`, `stdout` or `stderr`. + +The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can +close `stdin`, `stdout` or `stderr`. - โœ๏ธ Nats jobs driver support - [PR](https://github.com/spiral/roadrunner-plugins/pull/68). + ```yaml nats: addr: "demo.nats.io" @@ -194,12 +207,14 @@ jobs: consume: [ "test-1" ] ``` -- Driver uses NATS JetStream API and not compatible with non-js API. + +- Driver uses NATS JetStream API and is not compatible with non-js API. -- โœ๏ธ Response API for the NATS, RabbitMQ, SQS and Beanstalk drivers. This means, that you'll be able to respond to a specified in the response queue. - Limitations: - - To send a response to the queue maintained by the RR, you should send it as a `Job` type. There are no limitations for the responses into the other queues (tubes, subjects). +- โœ๏ธ Response API for the NATS, RabbitMQ, SQS and Beanstalk drivers. This means, that you'll be able to respond to a + specified in the response queue. Limitations: + - To send a response to the queue maintained by the RR, you should send it as a `Job` type. There are no limitations + for the responses into the other queues (tubes, subjects). - Driver uses the same endpoint (address) to send the response as specified in the configuration. ## ๐Ÿฉน Fixes: @@ -215,45 +230,53 @@ jobs: - ๐Ÿ“ฆ roadrunner `v2.5.0` - ๐Ÿ“ฆ roadrunner-plugins `v2.5.0` - ๐Ÿ“ฆ roadrunner-temporal `v1.0.10` -- ๐Ÿ“ฆ endure `v1.0.5` +- ๐Ÿ“ฆ endure `v1.0.6` - ๐Ÿ“ฆ goridge `v3.2.3` ## v2.4.1 (13.09.2021) ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: bug with not-idempotent call to the `attributes.Init`. -- ๐Ÿ› Fix: memory jobs driver behavior. Now memory driver starts consuming automatically if the user consumes the pipeline in the configuration. +- ๐Ÿ› Fix: bug with not-idempotent call to the `attributes.Init`. +- ๐Ÿ› Fix: memory jobs driver behavior. Now memory driver starts consuming automatically if the user consumes the + pipeline in the configuration. ## v2.4.0 (02.09.2021) ## ๐Ÿ’” Internal BC: -- ๐Ÿ”จ Pool, worker interfaces: payload now passed and returned by the pointer. +- ๐Ÿ”จ Pool, worker interfaces: payload now passed and returned by the pointer. ## ๐Ÿ‘€ New: -- โœ๏ธ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `memory` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726) -- โœ๏ธ Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`, `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2) -- โœ๏ธ Support for the Docker images via GitHub packages. -- โœ๏ธ Go 1.17 support for the all spiral packages. +- โœ๏ธ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. + Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `memory` and local queue powered + by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726) +- โœ๏ธ Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port` + , `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other + plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2) +- โœ๏ธ Support for the Docker images via GitHub packages. +- โœ๏ธ Go 1.17 support for the all spiral packages. ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: fixed bug with goroutines waiting on the internal worker's container channel, [issue](https://github.com/spiral/roadrunner/issues/750). -- ๐Ÿ› Fix: RR become unresponsive when new workers failed to re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772). -- ๐Ÿ› Fix: add `debug` pool config key to the `.rr.yaml` configuration [reference](https://github.com/spiral/roadrunner-binary/issues/79). +- ๐Ÿ› Fix: fixed bug with goroutines waiting on the internal worker's container + channel, [issue](https://github.com/spiral/roadrunner/issues/750). +- ๐Ÿ› Fix: RR become unresponsive when new workers failed to + re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772). +- ๐Ÿ› Fix: add `debug` pool config key to the `.rr.yaml` + configuration [reference](https://github.com/spiral/roadrunner-binary/issues/79). ## ๐Ÿ“ฆ Packages: -- ๐Ÿ“ฆ Update goridge to `v3.2.1` -- ๐Ÿ“ฆ Update temporal to `v1.0.9` -- ๐Ÿ“ฆ Update endure to `v1.0.4` +- ๐Ÿ“ฆ Update goridge to `v3.2.1` +- ๐Ÿ“ฆ Update temporal to `v1.0.9` +- ๐Ÿ“ฆ Update endure to `v1.0.4` ## ๐Ÿ“ˆ Summary: -- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29?closed=1) -- RR-Binary Milestone [2.4.0](https://github.com/spiral/roadrunner-binary/milestone/10?closed=1) +- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29?closed=1) +- RR-Binary Milestone [2.4.0](https://github.com/spiral/roadrunner-binary/milestone/10?closed=1) --- @@ -261,13 +284,13 @@ jobs: ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: Do not call the container's Stop method after the container stopped by an error. -- ๐Ÿ› Fix: Bug with ttl incorrectly handled by the worker [PR](https://github.com/spiral/roadrunner/pull/749) -- ๐Ÿ› Fix: Add `RR_BROADCAST_PATH` to the `websockets` plugin [PR](https://github.com/spiral/roadrunner/pull/749) +- ๐Ÿ› Fix: Do not call the container's Stop method after the container stopped by an error. +- ๐Ÿ› Fix: Bug with ttl incorrectly handled by the worker [PR](https://github.com/spiral/roadrunner/pull/749) +- ๐Ÿ› Fix: Add `RR_BROADCAST_PATH` to the `websockets` plugin [PR](https://github.com/spiral/roadrunner/pull/749) ## ๐Ÿ“ˆ Summary: -- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/31?closed=1) +- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/31?closed=1) --- @@ -275,32 +298,32 @@ jobs: ## ๐Ÿ‘€ New: -- โœ๏ธ Rework `broadcast` plugin. Add architecture diagrams to the `doc` - folder. [PR](https://github.com/spiral/roadrunner/pull/732) -- โœ๏ธ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736) +- โœ๏ธ Rework `broadcast` plugin. Add architecture diagrams to the `doc` + folder. [PR](https://github.com/spiral/roadrunner/pull/732) +- โœ๏ธ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736) ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit - reached [PR](https://github.com/spiral/roadrunner/pull/738) -- ๐Ÿ› Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next - request [PR](https://github.com/spiral/roadrunner/pull/738) -- ๐Ÿ› Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717) - , [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719) -- ๐Ÿ› Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720) -- ๐Ÿ› Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128) -- ๐Ÿ› Fix: Bug, incorrect request `origin` check [Bug](https://github.com/spiral/roadrunner/issues/727) +- ๐Ÿ› Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit + reached [PR](https://github.com/spiral/roadrunner/pull/738) +- ๐Ÿ› Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next + request [PR](https://github.com/spiral/roadrunner/pull/738) +- ๐Ÿ› Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717) + , [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719) +- ๐Ÿ› Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720) +- ๐Ÿ› Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128) +- ๐Ÿ› Fix: Bug, incorrect request `origin` check [Bug](https://github.com/spiral/roadrunner/issues/727) ## ๐Ÿ“ฆ Packages: -- ๐Ÿ“ฆ Update goridge to `v3.1.4` -- ๐Ÿ“ฆ Update temporal to `v1.0.8` +- ๐Ÿ“ฆ Update goridge to `v3.1.4` +- ๐Ÿ“ฆ Update temporal to `v1.0.8` ## ๐Ÿ“ˆ Summary: -- RR Milestone [2.3.1](https://github.com/spiral/roadrunner/milestone/30?closed=1) -- Temporal Milestone [1.0.8](https://github.com/temporalio/roadrunner-temporal/milestone/11?closed=1) -- Goridge Milestone [3.1.4](https://github.com/spiral/goridge/milestone/11?closed=1) +- RR Milestone [2.3.1](https://github.com/spiral/roadrunner/milestone/30?closed=1) +- Temporal Milestone [1.0.8](https://github.com/temporalio/roadrunner-temporal/milestone/11?closed=1) +- Goridge Milestone [3.1.4](https://github.com/spiral/goridge/milestone/11?closed=1) --- @@ -308,36 +331,36 @@ jobs: ## ๐Ÿ‘€ New: -- โœ๏ธ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of - thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus - on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513) -- โœ๏ธ Protobuf binary messages for the `websockets` and `kv` RPC calls under the - hood. [Issue](https://github.com/spiral/roadrunner/issues/711) -- โœ๏ธ Json-schemas for the config file v1.0 (it also registered - in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614)) -- โœ๏ธ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead) -- โœ๏ธ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error - code. [Issue](https://github.com/spiral/roadrunner/issues/659) -- โœ๏ธ Expose HTTP plugin metrics (workers memory, requests count, requests duration) - . [Issue](https://github.com/spiral/roadrunner/issues/489) -- โœ๏ธ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh` - scripts. [Issue](https://github.com/spiral/roadrunner/issues/658) -- โœ๏ธ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation) - , [Issue](https://github.com/spiral/roadrunner/issues/545) +- โœ๏ธ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of + thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus + on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513) +- โœ๏ธ Protobuf binary messages for the `websockets` and `kv` RPC calls under the + hood. [Issue](https://github.com/spiral/roadrunner/issues/711) +- โœ๏ธ Json-schemas for the config file v1.0 (it also registered + in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614)) +- โœ๏ธ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead) +- โœ๏ธ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error + code. [Issue](https://github.com/spiral/roadrunner/issues/659) +- โœ๏ธ Expose HTTP plugin metrics (workers memory, requests count, requests duration) + . [Issue](https://github.com/spiral/roadrunner/issues/489) +- โœ๏ธ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh` + scripts. [Issue](https://github.com/spiral/roadrunner/issues/658) +- โœ๏ธ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation) + , [Issue](https://github.com/spiral/roadrunner/issues/545) ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686) -- ๐Ÿ› Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in - logs: [Bug](https://github.com/spiral/roadrunner/issues/659) -- ๐Ÿ› Fix: Error message will be properly shown in the log in case of `SoftJob` - error: [Bug](https://github.com/spiral/roadrunner/issues/691) -- ๐Ÿ› Fix: Wrong applied middlewares for the `fcgi` server leads to the - NPE: [Bug](https://github.com/spiral/roadrunner/issues/701) +- ๐Ÿ› Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686) +- ๐Ÿ› Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in + logs: [Bug](https://github.com/spiral/roadrunner/issues/659) +- ๐Ÿ› Fix: Error message will be properly shown in the log in case of `SoftJob` + error: [Bug](https://github.com/spiral/roadrunner/issues/691) +- ๐Ÿ› Fix: Wrong applied middlewares for the `fcgi` server leads to the + NPE: [Bug](https://github.com/spiral/roadrunner/issues/701) ## ๐Ÿ“ฆ Packages: -- ๐Ÿ“ฆ Update goridge to `v3.1.0` +- ๐Ÿ“ฆ Update goridge to `v3.1.0` --- @@ -345,9 +368,9 @@ jobs: ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: revert static plugin. It stays as a separate plugin on the main route (`/`) and supports all the previously - announced features. -- ๐Ÿ› Fix: remove `build` and other old targets from the Makefile. +- ๐Ÿ› Fix: revert static plugin. It stays as a separate plugin on the main route (`/`) and supports all the previously + announced features. +- ๐Ÿ› Fix: remove `build` and other old targets from the Makefile. --- @@ -355,21 +378,21 @@ jobs: ## ๐Ÿ‘€ New: -- โœ๏ธ Reworked `static` plugin. Now, it does not affect the performance of the main route and persist on the separate - file server (within the `http` plugin). Looong awaited feature: `Etag` (+ weak Etags) as well with the `If-Mach` - , `If-None-Match`, `If-Range`, `Last-Modified` - and `If-Modified-Since` tags supported. Static plugin has a bunch of new options such as: `allow`, `calculate_etag` - , `weak` and `pattern`. +- โœ๏ธ Reworked `static` plugin. Now, it does not affect the performance of the main route and persist on the separate + file server (within the `http` plugin). Looong awaited feature: `Etag` (+ weak Etags) as well with the `If-Mach` + , `If-None-Match`, `If-Range`, `Last-Modified` + and `If-Modified-Since` tags supported. Static plugin has a bunch of new options such as: `allow`, `calculate_etag` + , `weak` and `pattern`. - ### Option `always` was deleted from the plugin. + ### Option `always` was deleted from the plugin. -- โœ๏ธ Update `informer.List` implementation. Now it returns a list with the all available plugins in the runtime. +- โœ๏ธ Update `informer.List` implementation. Now it returns a list with the all available plugins in the runtime. ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: issue with wrong ordered middlewares (reverse). Now the order is correct. -- ๐Ÿ› Fix: issue when RR fails if a user sets `debug` mode with the `exec_ttl` supervisor option. -- ๐Ÿ› Fix: uniform log levels. Use everywhere the same levels (warn, error, debug, info, panic). +- ๐Ÿ› Fix: issue with wrong ordered middlewares (reverse). Now the order is correct. +- ๐Ÿ› Fix: issue when RR fails if a user sets `debug` mode with the `exec_ttl` supervisor option. +- ๐Ÿ› Fix: uniform log levels. Use everywhere the same levels (warn, error, debug, info, panic). --- @@ -377,102 +400,102 @@ jobs: ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: issue with endure provided wrong logger interface implementation. +- ๐Ÿ› Fix: issue with endure provided wrong logger interface implementation. ## v2.1.0 (27.04.2021) ## ๐Ÿ‘€ New: -- โœ๏ธ New `service` plugin. Docs: [link](https://roadrunner.dev/docs/beep-beep-service) -- โœ๏ธ Stabilize `kv` plugin with `boltdb`, `in-memory`, `memcached` and `redis` drivers. +- โœ๏ธ New `service` plugin. Docs: [link](https://roadrunner.dev/docs/beep-beep-service) +- โœ๏ธ Stabilize `kv` plugin with `boltdb`, `in-memory`, `memcached` and `redis` drivers. ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: Logger didn't provide an anonymous log instance to a plugins w/o `Named` interface implemented. -- ๐Ÿ› Fix: http handler was without log listener after `rr reset`. +- ๐Ÿ› Fix: Logger didn't provide an anonymous log instance to a plugins w/o `Named` interface implemented. +- ๐Ÿ› Fix: http handler was without log listener after `rr reset`. ## v2.0.4 (06.04.2021) ## ๐Ÿ‘€ New: -- โœ๏ธ Add support for `linux/arm64` platform for docker image (thanks @tarampampam). -- โœ๏ธ Add dotenv file support (`.env` in working directory by default; file location can be changed using CLI - flag `--dotenv` or `DOTENV_PATH` environment variable) (thanks @tarampampam). -- ๐Ÿ“œ Add a new `raw` mode for the `logger` plugin to keep the stderr log message of the worker unmodified (logger - severity level should be at least `INFO`). -- ๐Ÿ†• Add Readiness probe check. The `status` plugin provides `/ready` endpoint which return the `204` HTTP code if there - are no workers in the `Ready` state and `200 OK` status if there are at least 1 worker in the `Ready` state. +- โœ๏ธ Add support for `linux/arm64` platform for docker image (thanks @tarampampam). +- โœ๏ธ Add dotenv file support (`.env` in working directory by default; file location can be changed using CLI + flag `--dotenv` or `DOTENV_PATH` environment variable) (thanks @tarampampam). +- ๐Ÿ“œ Add a new `raw` mode for the `logger` plugin to keep the stderr log message of the worker unmodified (logger + severity level should be at least `INFO`). +- ๐Ÿ†• Add Readiness probe check. The `status` plugin provides `/ready` endpoint which return the `204` HTTP code if there + are no workers in the `Ready` state and `200 OK` status if there are at least 1 worker in the `Ready` state. ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: bug with the temporal worker which does not follow general graceful shutdown period. +- ๐Ÿ› Fix: bug with the temporal worker which does not follow general graceful shutdown period. ## v2.0.3 (29.03.2021) ## ๐Ÿฉน Fixes: -- ๐Ÿ› Fix: slow last response when reached `max_jobs` limit. +- ๐Ÿ› Fix: slow last response when reached `max_jobs` limit. ## v2.0.2 (06.04.2021) -- ๐Ÿ› Fix: Bug with required Root CA certificate for the SSL, now it's optional. -- ๐Ÿ› Fix: Bug with incorrectly consuming metrics collector from the RPC calls (thanks @dstrop). -- ๐Ÿ†• New: HTTP/FCGI/HTTPS internal logs instead of going to the raw stdout will be displayed in the RR logger at - the `Info` log level. -- โšก New: Builds for the Mac with the M1 processor (arm64). -- ๐Ÿ‘ท Rework ServeHTTP handler logic. Use http.Error instead of writing code directly to the response writer. Other small - improvements. +- ๐Ÿ› Fix: Bug with required Root CA certificate for the SSL, now it's optional. +- ๐Ÿ› Fix: Bug with incorrectly consuming metrics collector from the RPC calls (thanks @dstrop). +- ๐Ÿ†• New: HTTP/FCGI/HTTPS internal logs instead of going to the raw stdout will be displayed in the RR logger at + the `Info` log level. +- โšก New: Builds for the Mac with the M1 processor (arm64). +- ๐Ÿ‘ท Rework ServeHTTP handler logic. Use http.Error instead of writing code directly to the response writer. Other small + improvements. ## v2.0.1 (09.03.2021) -- ๐Ÿ› Fix: incorrect PHP command validation -- ๐Ÿ› Fix: ldflags properly inject RR version -- โฌ†๏ธ Update: README, links to the go.pkg from v1 to v2 -- ๐Ÿ“ฆ Bump golang version in the Dockerfile and in the `go.mod` to 1.16 -- ๐Ÿ“ฆ Bump Endure container to v1.0.0. +- ๐Ÿ› Fix: incorrect PHP command validation +- ๐Ÿ› Fix: ldflags properly inject RR version +- โฌ†๏ธ Update: README, links to the go.pkg from v1 to v2 +- ๐Ÿ“ฆ Bump golang version in the Dockerfile and in the `go.mod` to 1.16 +- ๐Ÿ“ฆ Bump Endure container to v1.0.0. ## v2.0.0 (02.03.2021) -- โœ”๏ธ Add a shared server to create PHP worker pools instead of isolated worker pool in each individual plugin. -- ๐Ÿ†• New plugin system with auto-recovery, easier plugin API. -- ๐Ÿ“œ New `logger` plugin to configure logging for each plugin individually. -- ๐Ÿ” Up to 50% performance increase in HTTP workloads. -- โœ”๏ธ Add **[Temporal Workflow](https://temporal.io)** plugin to run distributed computations on scale. -- โœ”๏ธ Add `debug` flag to reload PHP worker ahead of a request (emulates PHP-FPM behavior). -- โŒ Eliminate `limit` service, now each worker pool includes `supervisor` configuration. -- ๐Ÿ†• New resetter, informer plugins to perform hot reloads and observe loggers in a system. -- ๐Ÿ’ซ Expose more HTTP plugin configuration options. -- ๐Ÿ†• Headers, static and gzip services now located in HTTP config. -- ๐Ÿ†• Ability to configure the middleware sequence. -- ๐Ÿ’ฃ Faster Goridge protocol (eliminated 50% of syscalls). -- ๐Ÿ’พ Add support for binary payloads for RPC (`msgpack`). -- ๐Ÿ†• Server no longer stops when a PHP worker dies (attempts to restart). -- ๐Ÿ’พ New RR binary server downloader. -- ๐Ÿ’ฃ Echoing no longer breaks execution (yay!). -- ๐Ÿ†• Migration to ZapLogger instead of Logrus. -- ๐Ÿ’ฅ RR can no longer stuck when studding down with broken tasks in a pipeline. -- ๐Ÿงช More tests, more static analysis. -- ๐Ÿ’ฅ Create a new foundation for new KV, WebSocket, GRPC and Queue plugins. +- โœ”๏ธ Add a shared server to create PHP worker pools instead of isolated worker pool in each individual plugin. +- ๐Ÿ†• New plugin system with auto-recovery, easier plugin API. +- ๐Ÿ“œ New `logger` plugin to configure logging for each plugin individually. +- ๐Ÿ” Up to 50% performance increase in HTTP workloads. +- โœ”๏ธ Add **[Temporal Workflow](https://temporal.io)** plugin to run distributed computations on scale. +- โœ”๏ธ Add `debug` flag to reload PHP worker ahead of a request (emulates PHP-FPM behavior). +- โŒ Eliminate `limit` service, now each worker pool includes `supervisor` configuration. +- ๐Ÿ†• New resetter, informer plugins to perform hot reloads and observe loggers in a system. +- ๐Ÿ’ซ Expose more HTTP plugin configuration options. +- ๐Ÿ†• Headers, static and gzip services now located in HTTP config. +- ๐Ÿ†• Ability to configure the middleware sequence. +- ๐Ÿ’ฃ Faster Goridge protocol (eliminated 50% of syscalls). +- ๐Ÿ’พ Add support for binary payloads for RPC (`msgpack`). +- ๐Ÿ†• Server no longer stops when a PHP worker dies (attempts to restart). +- ๐Ÿ’พ New RR binary server downloader. +- ๐Ÿ’ฃ Echoing no longer breaks execution (yay!). +- ๐Ÿ†• Migration to ZapLogger instead of Logrus. +- ๐Ÿ’ฅ RR can no longer stuck when studding down with broken tasks in a pipeline. +- ๐Ÿงช More tests, more static analysis. +- ๐Ÿ’ฅ Create a new foundation for new KV, WebSocket, GRPC and Queue plugins. ## v2.0.0-RC.4 (20.02.2021) -- PHP tests use latest signatures (https://github.com/spiral/roadrunner/pull/550). -- Endure container update to v1.0.0-RC.2 version. -- Remove unneeded mutex from the `http.Workers` method. -- Rename `checker` plugin package to `status`, remove `/v1` endpoint prefix (#557). -- Add static, headers, status, gzip plugins to the `main.go`. -- Fix workers pool behavior -> idle_ttl, ttl, max_memory are soft errors and exec_ttl is hard error. +- PHP tests use latest signatures (https://github.com/spiral/roadrunner/pull/550). +- Endure container update to v1.0.0-RC.2 version. +- Remove unneeded mutex from the `http.Workers` method. +- Rename `checker` plugin package to `status`, remove `/v1` endpoint prefix (#557). +- Add static, headers, status, gzip plugins to the `main.go`. +- Fix workers pool behavior -> idle_ttl, ttl, max_memory are soft errors and exec_ttl is hard error. ## v2.0.0-RC.3 (17.02.2021) -- Add support for the overwriting `.rr.yaml` keys with values (ref: https://roadrunner.dev/docs/intro-config) -- Make logger plugin optional to define in the config. Default values: level -> `debug`, mode -> `development` -- Add the ability to read env variables from the `.rr.yaml` in the form of: `rpc.listen: {RPC_ADDR}`. Reference: - ref: https://roadrunner.dev/docs/intro-config (Environment Variables paragraph) +- Add support for the overwriting `.rr.yaml` keys with values (ref: https://roadrunner.dev/docs/intro-config) +- Make logger plugin optional to define in the config. Default values: level -> `debug`, mode -> `development` +- Add the ability to read env variables from the `.rr.yaml` in the form of: `rpc.listen: {RPC_ADDR}`. Reference: + ref: https://roadrunner.dev/docs/intro-config (Environment Variables paragraph) ## v2.0.0-RC.2 (11.02.2021) -- Update RR to version v2.0.0-RC.2 -- Update Temporal plugin to version v2.0.0-RC.1 -- Update Goridge to version v3.0.1 -- Update Endure to version v1.0.0-RC.1 +- Update RR to version v2.0.0-RC.2 +- Update Temporal plugin to version v2.0.0-RC.1 +- Update Goridge to version v3.0.1 +- Update Endure to version v1.0.0-RC.1 -- cgit v1.2.3 From 801aa5900ebafd6d1837c3bd49ce0d4bf6382735 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 27 Oct 2021 17:20:49 +0300 Subject: update CHANGELOG Signed-off-by: Valery Piashchynski --- CHANGELOG.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39f3f0e6..47bc475f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,13 @@ # CHANGELOG -## v2.5.0 (20.10.2021) +# v2.6.0 (-.-.2021) + +### ๐Ÿ‘€ New: + +- โœ๏ธ New internal message bus. Available globally. Supports wildcard subscriptions (for example: `http.*` will subscribe you to the all events coming from the `http` plugin). The subscriptions can be made from any RR plugin to any RR plugin. + + +# v2.5.0 (20.10.2021) # ๐Ÿ’” Breaking change: -- cgit v1.2.3 From 15e6f474ef1340f4e93f213bab1cb9548e51a1e8 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 27 Oct 2021 19:35:10 +0300 Subject: update tests Signed-off-by: Valery Piashchynski --- pool/supervisor_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index c3abf85e..9c0bfdaa 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -265,6 +265,11 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { assert.Empty(t, resp.Context) time.Sleep(time.Second * 2) + + if len(p.Workers()) < 1 { + t.Fatal("should be at least 1 worker") + return + } // should be destroyed, state should be Ready, not Invalid assert.NotEqual(t, pid, p.Workers()[0].Pid()) assert.Equal(t, int64(1), p.Workers()[0].State().Value()) -- cgit v1.2.3 From 52a6b1b2fc3eaf3cda5594825f3c5a9ae8a9452b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 27 Oct 2021 22:42:07 +0300 Subject: Make sure events bus properly closed Signed-off-by: Valery Piashchynski --- events/events.go | 143 ++++++++++++++++++++++ events/events_test.go | 6 +- events/types.go | 165 ++------------------------ pool/static_pool.go | 56 ++------- pool/static_pool_test.go | 3 + pool/supervisor_pool.go | 43 +++---- pool/supervisor_test.go | 1 + transport/pipe/pipe_factory_spawn_test.go | 2 + transport/pipe/pipe_factory_test.go | 2 + transport/socket/socket_factory_spawn_test.go | 4 + transport/socket/socket_factory_test.go | 4 + worker/worker.go | 10 +- worker_watcher/worker_watcher.go | 30 +---- 13 files changed, 204 insertions(+), 265 deletions(-) create mode 100644 events/events.go diff --git a/events/events.go b/events/events.go new file mode 100644 index 00000000..b7396653 --- /dev/null +++ b/events/events.go @@ -0,0 +1,143 @@ +package events + +type EventType uint32 + +const ( + // EventUnaryCallOk represents success unary call response + EventUnaryCallOk EventType = iota + + // EventUnaryCallErr raised when unary call ended with error + EventUnaryCallErr + + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipePaused when pipeline has been paused. + EventPipePaused + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventDriverReady thrown when broken is ready to accept/serve tasks. + EventDriverReady + + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventWorkerProcessExit triggered on process wait exit + EventWorkerProcessExit + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL + + // EventPoolRestart triggered when pool restart is needed + EventPoolRestart + + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog + // EventWorkerStderr is the worker standard error output + EventWorkerStderr + // EventWorkerWaitExit is the worker exit event + EventWorkerWaitExit +) + +func (et EventType) String() string { + switch et { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventDriverReady: + return "EventDriverReady" + case EventPipePaused: + return "EventPipePaused" + + case EventUnaryCallOk: + return "EventUnaryCallOk" + case EventUnaryCallErr: + return "EventUnaryCallErr" + + case EventWorkerProcessExit: + return "EventWorkerProcessExit" + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + case EventPoolRestart: + return "EventPoolRestart" + + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + case EventWorkerStderr: + return "EventWorkerStderr" + case EventWorkerWaitExit: + return "EventWorkerWaitExit" + + default: + return "UnknownEventType" + } +} diff --git a/events/events_test.go b/events/events_test.go index 6d6e90a2..e15c55d6 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -15,7 +15,7 @@ func TestEvenHandler(t *testing.T) { err := eh.SubscribeP(id, "http.EventJobOK", ch) require.NoError(t, err) - eh.Send(NewRREvent(EventJobOK, "foo", "http")) + eh.Send(NewEvent(EventJobOK, "http", "foo")) evt := <-ch require.Equal(t, "foo", evt.Message()) @@ -37,7 +37,7 @@ func TestEvenHandler2(t *testing.T) { err = eh.SubscribeP(id, "http.EventJobOK", ch2) require.NoError(t, err) - eh.Send(NewRREvent(EventJobOK, "foo", "http")) + eh.Send(NewEvent(EventJobOK, "http", "foo")) evt := <-ch2 require.Equal(t, "foo", evt.Message()) @@ -85,7 +85,7 @@ func TestEvenHandler5(t *testing.T) { err := eh.SubscribeP(id, "http.EventJobOK", ch) require.NoError(t, err) - eh.Send(NewRREvent(EventJobOK, "foo", "http")) + eh.Send(NewEvent(EventJobOK, "http", "foo")) evt := <-ch require.Equal(t, "foo", evt.Message()) diff --git a/events/types.go b/events/types.go index d8e40084..65a76d15 100644 --- a/events/types.go +++ b/events/types.go @@ -17,173 +17,30 @@ type Event interface { type RREvent struct { // event typ - T EventType + typ EventType // plugin - P string + plugin string // message - M string + message string } -// NewRREvent initializes new event -func NewRREvent(t EventType, msg string, plugin string) *RREvent { - // get +// NewEvent initializes new event +func NewEvent(t EventType, plugin string, msg string) *RREvent { return &RREvent{ - T: t, - P: plugin, - M: msg, + typ: t, + plugin: plugin, + message: msg, } } func (r *RREvent) Type() EventType { - return r.T + return r.typ } func (r *RREvent) Message() string { - return r.M + return r.message } func (r *RREvent) Plugin() string { - return r.P -} - -type EventType uint32 - -const ( - // EventUnaryCallOk represents success unary call response - EventUnaryCallOk EventType = iota - - // EventUnaryCallErr raised when unary call ended with error - EventUnaryCallErr - - // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK - - // EventPushError caused when job can not be registered. - EventPushError - - // EventJobStart thrown when new job received. - EventJobStart - - // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. - EventJobOK - - // EventJobError thrown on all job related errors. See JobError as context. - EventJobError - - // EventPipeActive when pipeline has started. - EventPipeActive - - // EventPipeStopped when pipeline has been stopped. - EventPipeStopped - - // EventPipePaused when pipeline has been paused. - EventPipePaused - - // EventPipeError when pipeline specific error happen. - EventPipeError - - // EventDriverReady thrown when broken is ready to accept/serve tasks. - EventDriverReady - - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct - - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - - // EventSupervisorError triggered when supervisor can not complete work. - EventSupervisorError - - // EventWorkerProcessExit triggered on process wait exit - EventWorkerProcessExit - - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) - EventTTL - - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL - - // EventPoolRestart triggered when pool restart is needed - EventPoolRestart - - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog - // EventWorkerStderr is the worker standard error output - EventWorkerStderr - // EventWorkerWaitExit is the worker exit event - EventWorkerWaitExit -) - -func (et EventType) String() string { - switch et { - case EventPushOK: - return "EventPushOK" - case EventPushError: - return "EventPushError" - case EventJobStart: - return "EventJobStart" - case EventJobOK: - return "EventJobOK" - case EventJobError: - return "EventJobError" - case EventPipeActive: - return "EventPipeActive" - case EventPipeStopped: - return "EventPipeStopped" - case EventPipeError: - return "EventPipeError" - case EventDriverReady: - return "EventDriverReady" - case EventPipePaused: - return "EventPipePaused" - - case EventUnaryCallOk: - return "EventUnaryCallOk" - case EventUnaryCallErr: - return "EventUnaryCallErr" - - case EventWorkerProcessExit: - return "EventWorkerProcessExit" - case EventWorkerConstruct: - return "EventWorkerConstruct" - case EventWorkerDestruct: - return "EventWorkerDestruct" - case EventSupervisorError: - return "EventSupervisorError" - case EventNoFreeWorkers: - return "EventNoFreeWorkers" - case EventMaxMemory: - return "EventMaxMemory" - case EventTTL: - return "EventTTL" - case EventIdleTTL: - return "EventIdleTTL" - case EventExecTTL: - return "EventExecTTL" - case EventPoolRestart: - return "EventPoolRestart" - - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - case EventWorkerStderr: - return "EventWorkerStderr" - case EventWorkerWaitExit: - return "EventWorkerWaitExit" - - default: - return "UnknownEventType" - } + return r.plugin } diff --git a/pool/static_pool.go b/pool/static_pool.go index 27db830c..11112e72 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -99,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { - sp := supervisorWrapper(p, p.cfg.Supervisor) + sp := supervisorWrapper(p, eb, p.cfg.Supervisor) // start watcher timer sp.Start() return sp, nil @@ -195,11 +195,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()))) } } @@ -221,11 +217,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Send(&events.RREvent{ - T: events.EventNoFreeWorkers, - P: pluginName, - M: fmt.Sprintf("error: %s", err), - }) + sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("error: %s", err))) return nil, errors.E(op, err) } // else if err not nil - return error @@ -245,20 +237,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Send(&events.RREvent{ - T: events.EventExecTTL, - P: pluginName, - M: fmt.Sprintf("error: %s", err), - }) + sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("error: %s", err))) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -279,11 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // kill the worker instead of sending net packet to it _ = w.Kill() @@ -291,11 +271,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return nil, err default: w.State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventWorkerDestruct, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { @@ -319,11 +295,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // wrap sync worker sw := worker.From(w) - sp.events.Send(&events.RREvent{ - T: events.EventWorkerConstruct, - P: pluginName, - M: fmt.Sprintf("pid: %d", sw.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("pid: %d", sw.Pid()))) return sw, nil } } @@ -345,11 +317,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw.State().Set(worker.StateDestroyed) err = sw.Kill() if err != nil { - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) return nil, err } @@ -366,11 +334,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // redirect call to the worker with TTL r, err := sw.ExecWithTTL(ctx, p) if stopErr := sw.Stop(); stopErr != nil { - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) } return r, err diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index abef3779..717d301e 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -170,6 +170,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -201,6 +202,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { // Run pool events eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch) require.NoError(t, err) @@ -489,6 +491,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch) require.NoError(t, err) diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index c1fb6eec..1a94f6a0 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -28,23 +28,20 @@ type Supervised interface { } type supervised struct { - cfg *SupervisorConfig - events events.EventBus - eventsID string - pool Pool - stopCh chan struct{} - mu *sync.RWMutex + cfg *SupervisorConfig + events events.EventBus + pool Pool + stopCh chan struct{} + mu *sync.RWMutex } -func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised { - eb, id := events.Bus() +func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised { sp := &supervised{ - cfg: cfg, - events: eb, - eventsID: id, - pool: pool, - mu: &sync.RWMutex{}, - stopCh: make(chan struct{}), + cfg: cfg, + events: eb, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), } return sp @@ -155,11 +152,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventTTL, - P: supervisorName, - M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), - }) + sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) continue } @@ -179,11 +172,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventMaxMemory, - P: supervisorName, - M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), - }) + sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) continue } @@ -238,11 +227,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double-check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventIdleTTL, - P: supervisorName, - M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), - }) + sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index 9c0bfdaa..eb3c37dd 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -332,6 +332,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { } eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch) require.NoError(t, err) diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go index 81004027..256176de 100644 --- a/transport/pipe/pipe_factory_spawn_test.go +++ b/transport/pipe/pipe_factory_spawn_test.go @@ -108,6 +108,7 @@ func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -370,6 +371,7 @@ func Test_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index 8c6d440a..0f527cd5 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -127,6 +127,7 @@ func Test_Pipe_Failboot(t *testing.T) { ctx := context.Background() eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -434,6 +435,7 @@ func Test_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go index 45fb3bd5..2db2fd40 100644 --- a/transport/socket/socket_factory_spawn_test.go +++ b/transport/socket/socket_factory_spawn_test.go @@ -112,6 +112,7 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -162,6 +163,7 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -271,6 +273,7 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -327,6 +330,7 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go index 11b34999..7b28a847 100755 --- a/transport/socket/socket_factory_test.go +++ b/transport/socket/socket_factory_test.go @@ -126,6 +126,7 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -203,6 +204,7 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -366,6 +368,7 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -440,6 +443,7 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) diff --git a/worker/worker.go b/worker/worker.go index 5973adc6..05c6dd0d 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -135,6 +135,7 @@ func (w *Process) Wait() error { const op = errors.Op("process_wait") var err error err = w.cmd.Wait() + defer w.events.Unsubscribe(w.eventsID) // If worker was destroyed, just exit if w.State().Value() == StateDestroyed { @@ -162,8 +163,6 @@ func (w *Process) Wait() error { return nil } - w.events.Unsubscribe(w.eventsID) - return err } @@ -221,11 +220,6 @@ func (w *Process) Kill() error { // Worker stderr func (w *Process) Write(p []byte) (n int, err error) { - w.events.Send(&events.RREvent{ - T: events.EventWorkerStderr, - P: workerEventsName, - M: utils.AsString(p), - }) - + w.events.Send(events.NewEvent(events.EventWorkerStderr, workerEventsName, utils.AsString(p))) return len(p), nil } diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 871e6146..d425994e 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -148,11 +148,7 @@ func (ww *workerWatcher) Allocate() error { sw, err := ww.allocator() if err != nil { // log incident - ww.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: wwName, - M: fmt.Sprintf("can't allocate the worker: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err))) // if no timeout, return error immediately if ww.allocateTimeout == 0 { @@ -176,11 +172,7 @@ func (ww *workerWatcher) Allocate() error { sw, err = ww.allocator() if err != nil { // log incident - ww.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: wwName, - M: fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err))) continue } @@ -287,11 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { - ww.events.Send(&events.RREvent{ - T: events.EventWorkerWaitExit, - P: wwName, - M: fmt.Sprintf("error: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("error: %v", err))) } // remove worker @@ -299,11 +287,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace - ww.events.Send(&events.RREvent{ - T: events.EventWorkerDestruct, - P: wwName, - M: fmt.Sprintf("pid: %d", w.Pid()), - }) + ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("pid: %d", w.Pid()))) return } @@ -313,11 +297,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { err = ww.Allocate() if err != nil { - ww.events.Send(&events.RREvent{ - T: events.EventWorkerProcessExit, - P: wwName, - M: fmt.Sprintf("error: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("error: %v", err))) // no workers at all, panic if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { -- cgit v1.2.3