summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--events/docs/events.md0
-rw-r--r--events/events_test.go22
-rw-r--r--events/eventsbus.go129
-rwxr-xr-xevents/general.go41
-rw-r--r--events/grpc_event.go39
-rw-r--r--events/init.go20
-rw-r--r--events/interface.go14
-rw-r--r--events/jobs_events.go81
-rw-r--r--events/pool_events.go71
-rw-r--r--events/types.go196
-rw-r--r--events/wildcard.go43
-rw-r--r--events/wildcard_test.go48
-rw-r--r--events/worker_events.go40
-rwxr-xr-xpool/static_pool.go102
-rwxr-xr-xpool/static_pool_test.go58
-rwxr-xr-xpool/supervisor_pool.go49
-rw-r--r--pool/supervisor_test.go15
-rw-r--r--tests/psr-worker-post.php (renamed from tests/worker-ok.php)7
-rw-r--r--tests/temporal-worker.php34
-rw-r--r--tests/worker-cors.php15
-rw-r--r--tests/worker-deny.php30
-rw-r--r--tests/worker-origin.php14
-rw-r--r--tests/worker-stop.php26
-rw-r--r--transport/interface.go5
-rwxr-xr-xtransport/pipe/pipe_factory.go9
-rw-r--r--transport/pipe/pipe_factory_spawn_test.go46
-rwxr-xr-xtransport/pipe/pipe_factory_test.go44
-rwxr-xr-xtransport/socket/socket_factory.go9
-rw-r--r--transport/socket/socket_factory_spawn_test.go89
-rwxr-xr-xtransport/socket/socket_factory_test.go92
-rwxr-xr-xworker/worker.go49
-rwxr-xr-xworker_watcher/worker_watcher.go58
33 files changed, 789 insertions, 708 deletions
diff --git a/Makefile b/Makefile
index 9c800ea4..d2f4ebc6 100644
--- a/Makefile
+++ b/Makefile
@@ -14,6 +14,7 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.out -covermode=atomic ./bst
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.out -covermode=atomic ./priority_queue
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/events.out -covermode=atomic ./events
echo 'mode: atomic' > ./coverage-ci/summary.txt
tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt
@@ -25,3 +26,4 @@ test: ## Run application tests
go test -v -race -tags=debug ./worker_watcher
go test -v -race -tags=debug ./bst
go test -v -race -tags=debug ./priority_queue
+ go test -v -race -tags=debug ./events
diff --git a/events/docs/events.md b/events/docs/events.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/events/docs/events.md
diff --git a/events/events_test.go b/events/events_test.go
new file mode 100644
index 00000000..6c501392
--- /dev/null
+++ b/events/events_test.go
@@ -0,0 +1,22 @@
+package events
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestEvenHandler(t *testing.T) {
+ eh, id := Bus()
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewRREvent(EventJobOK, "foo", "http"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventJobOK", evt.Type().String())
+}
diff --git a/events/eventsbus.go b/events/eventsbus.go
new file mode 100644
index 00000000..79f5babd
--- /dev/null
+++ b/events/eventsbus.go
@@ -0,0 +1,129 @@
+package events
+
+import (
+ "fmt"
+ "sync"
+)
+
+type sub struct {
+ pattern string
+ w *wildcard
+ events chan<- Event
+}
+
+type eventsBus struct {
+ sync.RWMutex
+ subscribers sync.Map
+ internalEvCh chan Event
+ stop chan struct{}
+}
+
+func newEventsBus() *eventsBus {
+ return &eventsBus{
+ internalEvCh: make(chan Event, 100),
+ stop: make(chan struct{}),
+ }
+}
+
+/*
+http.* <-
+*/
+
+// SubscribeAll for all RR events
+// returns subscriptionID
+func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error {
+ return eb.subscribe(subID, "*", ch)
+}
+
+// SubscribeP pattern like "pluginName.EventType"
+func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error {
+ return eb.subscribe(subID, pattern, ch)
+}
+
+func (eb *eventsBus) Unsubscribe(subID string) {
+ eb.subscribers.Delete(subID)
+}
+func (eb *eventsBus) UnsubscribeP(subID, pattern string) {
+ if sb, ok := eb.subscribers.Load(subID); ok {
+ eb.Lock()
+ defer eb.Unlock()
+
+ sbArr := sb.([]*sub)
+
+ for i := 0; i < len(sbArr); i++ {
+ if sbArr[i].pattern == pattern {
+ sbArr[i] = sbArr[len(sbArr)-1]
+ sbArr = sbArr[:len(sbArr)-1]
+ // replace with new array
+ eb.subscribers.Store(subID, sbArr)
+ return
+ }
+ }
+ }
+}
+
+// Send sends event to the events bus
+func (eb *eventsBus) Send(ev Event) {
+ eb.internalEvCh <- ev
+}
+
+func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error {
+ eb.Lock()
+ defer eb.Unlock()
+ w, err := newWildcard(pattern)
+ if err != nil {
+ return err
+ }
+
+ if sb, ok := eb.subscribers.Load(subID); ok {
+ // at this point we are confident that sb is a []*sub type
+ subArr := sb.([]*sub)
+ subArr = append(subArr, &sub{
+ pattern: pattern,
+ w: w,
+ events: ch,
+ })
+
+ eb.subscribers.Store(subID, subArr)
+
+ return nil
+ }
+
+ subArr := make([]*sub, 0, 5)
+ subArr = append(subArr, &sub{
+ pattern: pattern,
+ w: w,
+ events: ch,
+ })
+
+ eb.subscribers.Store(subID, subArr)
+
+ return nil
+}
+
+func (eb *eventsBus) handleEvents() {
+ for { //nolint:gosimple
+ select {
+ case ev := <-eb.internalEvCh:
+ //
+ wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String())
+
+ eb.subscribers.Range(func(key, value interface{}) bool {
+ vsub := value.([]*sub)
+
+ for i := 0; i < len(vsub); i++ {
+ if vsub[i].w.match(wc) {
+ select {
+ case vsub[i].events <- ev:
+ return true
+ default:
+ return true
+ }
+ }
+ }
+
+ return true
+ })
+ }
+ }
+}
diff --git a/events/general.go b/events/general.go
deleted file mode 100755
index 5cf13e10..00000000
--- a/events/general.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package events
-
-import (
- "sync"
-)
-
-const UnknownEventType string = "Unknown event type"
-
-// HandlerImpl helps to broadcast events to multiple listeners.
-type HandlerImpl struct {
- listeners []Listener
- sync.RWMutex // all receivers should be pointers
-}
-
-func NewEventsHandler() Handler {
- return &HandlerImpl{listeners: make([]Listener, 0, 2)}
-}
-
-// NumListeners returns number of event listeners.
-func (eb *HandlerImpl) NumListeners() int {
- eb.Lock()
- defer eb.Unlock()
- return len(eb.listeners)
-}
-
-// AddListener registers new event listener.
-func (eb *HandlerImpl) AddListener(listener Listener) {
- eb.Lock()
- defer eb.Unlock()
- eb.listeners = append(eb.listeners, listener)
-}
-
-// Push broadcast events across all event listeners.
-func (eb *HandlerImpl) Push(e interface{}) {
- // ReadLock here because we are not changing listeners
- eb.RLock()
- defer eb.RUnlock()
- for k := range eb.listeners {
- eb.listeners[k](e)
- }
-}
diff --git a/events/grpc_event.go b/events/grpc_event.go
deleted file mode 100644
index 31ff4957..00000000
--- a/events/grpc_event.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package events
-
-import (
- "time"
-
- "google.golang.org/grpc"
-)
-
-const (
- // EventUnaryCallOk represents success unary call response
- EventUnaryCallOk G = iota + 13000
-
- // EventUnaryCallErr raised when unary call ended with error
- EventUnaryCallErr
-)
-
-type G int64
-
-func (ev G) String() string {
- switch ev {
- case EventUnaryCallOk:
- return "EventUnaryCallOk"
- case EventUnaryCallErr:
- return "EventUnaryCallErr"
- }
- return UnknownEventType
-}
-
-// JobEvent represent job event.
-type GRPCEvent struct {
- Event G
- // Info contains unary call info.
- Info *grpc.UnaryServerInfo
- // Error associated with event.
- Error error
- // event timings
- Start time.Time
- Elapsed time.Duration
-}
diff --git a/events/init.go b/events/init.go
new file mode 100644
index 00000000..25e92fc5
--- /dev/null
+++ b/events/init.go
@@ -0,0 +1,20 @@
+package events
+
+import (
+ "sync"
+
+ "github.com/google/uuid"
+)
+
+var evBus *eventsBus
+var onInit = &sync.Once{}
+
+func Bus() (*eventsBus, string) {
+ onInit.Do(func() {
+ evBus = newEventsBus()
+ go evBus.handleEvents()
+ })
+
+ // return events bus with subscriberID
+ return evBus, uuid.NewString()
+}
diff --git a/events/interface.go b/events/interface.go
deleted file mode 100644
index 7d57e4d0..00000000
--- a/events/interface.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package events
-
-// Handler interface
-type Handler interface {
- // NumListeners return number of active listeners
- NumListeners() int
- // AddListener adds lister to the publisher
- AddListener(listener Listener)
- // Push pushes event to the listeners
- Push(e interface{})
-}
-
-// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service.
-type Listener func(event interface{})
diff --git a/events/jobs_events.go b/events/jobs_events.go
deleted file mode 100644
index f65ede67..00000000
--- a/events/jobs_events.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package events
-
-import (
- "time"
-)
-
-const (
- // EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK J = iota + 12000
-
- // EventPushError caused when job can not be registered.
- EventPushError
-
- // EventJobStart thrown when new job received.
- EventJobStart
-
- // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
- EventJobOK
-
- // EventJobError thrown on all job related errors. See JobError as context.
- EventJobError
-
- // EventPipeActive when pipeline has started.
- EventPipeActive
-
- // EventPipeStopped when pipeline has been stopped.
- EventPipeStopped
-
- // EventPipePaused when pipeline has been paused.
- EventPipePaused
-
- // EventPipeError when pipeline specific error happen.
- EventPipeError
-
- // EventDriverReady thrown when broken is ready to accept/serve tasks.
- EventDriverReady
-)
-
-type J int64
-
-func (ev J) String() string {
- switch ev {
- case EventPushOK:
- return "EventPushOK"
- case EventPushError:
- return "EventPushError"
- case EventJobStart:
- return "EventJobStart"
- case EventJobOK:
- return "EventJobOK"
- case EventJobError:
- return "EventJobError"
- case EventPipeActive:
- return "EventPipeActive"
- case EventPipeStopped:
- return "EventPipeStopped"
- case EventPipeError:
- return "EventPipeError"
- case EventDriverReady:
- return "EventDriverReady"
- case EventPipePaused:
- return "EventPipePaused"
- }
- return UnknownEventType
-}
-
-// JobEvent represent job event.
-type JobEvent struct {
- Event J
- // String is job id.
- ID string
- // Pipeline name
- Pipeline string
- // Associated driver name (amqp, ephemeral, etc)
- Driver string
- // Error for the jobs/pipes errors
- Error error
- // event timings
- Start time.Time
- Elapsed time.Duration
-}
diff --git a/events/pool_events.go b/events/pool_events.go
deleted file mode 100644
index eb28df6a..00000000
--- a/events/pool_events.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package events
-
-const (
- // EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct P = iota + 10000
-
- // EventWorkerDestruct thrown after worker destruction.
- EventWorkerDestruct
-
- // EventSupervisorError triggered when supervisor can not complete work.
- EventSupervisorError
-
- // EventWorkerProcessExit triggered on process wait exit
- EventWorkerProcessExit
-
- // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
- EventNoFreeWorkers
-
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory
-
- // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
- EventTTL
-
- // EventIdleTTL triggered when worker spends too much time at rest.
- EventIdleTTL
-
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
- EventExecTTL
-
- // EventPoolRestart triggered when pool restart is needed
- EventPoolRestart
-)
-
-type P int64
-
-func (ev P) String() string {
- switch ev {
- case EventWorkerProcessExit:
- return "EventWorkerProcessExit"
- case EventWorkerConstruct:
- return "EventWorkerConstruct"
- case EventWorkerDestruct:
- return "EventWorkerDestruct"
- case EventSupervisorError:
- return "EventSupervisorError"
- case EventNoFreeWorkers:
- return "EventNoFreeWorkers"
- case EventMaxMemory:
- return "EventMaxMemory"
- case EventTTL:
- return "EventTTL"
- case EventIdleTTL:
- return "EventIdleTTL"
- case EventExecTTL:
- return "EventExecTTL"
- case EventPoolRestart:
- return "EventPoolRestart"
- }
- return UnknownEventType
-}
-
-// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
-type PoolEvent struct {
- // Event type, see below.
- Event P
-
- // Payload depends on event type, typically it's worker or error.
- Payload interface{}
- Error error
-}
diff --git a/events/types.go b/events/types.go
new file mode 100644
index 00000000..09bbf7a7
--- /dev/null
+++ b/events/types.go
@@ -0,0 +1,196 @@
+package events
+
+import (
+ "fmt"
+)
+
+type EventBus interface {
+ SubscribeAll(subID string, ch chan<- Event) error
+ SubscribeP(subID string, pattern string, ch chan<- Event) error
+ Unsubscribe(subID string)
+ UnsubscribeP(subID, pattern string)
+ Send(ev Event)
+}
+
+type Event interface {
+ fmt.Stringer
+ Plugin() string
+ Type() EventType
+ Message() string
+}
+
+type RREvent struct {
+ // event typ
+ T EventType
+ // plugin
+ P string
+ // message
+ M string
+}
+
+// NewRREvent initializes new event
+func NewRREvent(t EventType, msg string, plugin string) *RREvent {
+ return &RREvent{
+ T: t,
+ P: plugin,
+ M: msg,
+ }
+}
+
+func (r *RREvent) String() string {
+ return "RoadRunner event"
+}
+
+func (r *RREvent) Type() EventType {
+ return r.T
+}
+
+func (r *RREvent) Message() string {
+ return r.M
+}
+
+func (r *RREvent) Plugin() string {
+ return r.P
+}
+
+type EventType uint32
+
+const (
+ // EventUnaryCallOk represents success unary call response
+ EventUnaryCallOk EventType = iota
+
+ // EventUnaryCallErr raised when unary call ended with error
+ EventUnaryCallErr
+
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipePaused when pipeline has been paused.
+ EventPipePaused
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventDriverReady thrown when broken is ready to accept/serve tasks.
+ EventDriverReady
+
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventWorkerProcessExit triggered on process wait exit
+ EventWorkerProcessExit
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+
+ // EventPoolRestart triggered when pool restart is needed
+ EventPoolRestart
+
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+ // EventWorkerStderr is the worker standard error output
+ EventWorkerStderr
+ // EventWorkerWaitExit is the worker exit event
+ EventWorkerWaitExit
+)
+
+func (et EventType) String() string {
+ switch et {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventDriverReady:
+ return "EventDriverReady"
+ case EventPipePaused:
+ return "EventPipePaused"
+
+ case EventUnaryCallOk:
+ return "EventUnaryCallOk"
+ case EventUnaryCallErr:
+ return "EventUnaryCallErr"
+
+ case EventWorkerProcessExit:
+ return "EventWorkerProcessExit"
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ case EventPoolRestart:
+ return "EventPoolRestart"
+
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ case EventWorkerStderr:
+ return "EventWorkerStderr"
+ case EventWorkerWaitExit:
+ return "EventWorkerWaitExit"
+
+ default:
+ return "UnknownEventType"
+ }
+}
diff --git a/events/wildcard.go b/events/wildcard.go
new file mode 100644
index 00000000..171cbf9d
--- /dev/null
+++ b/events/wildcard.go
@@ -0,0 +1,43 @@
+package events
+
+import (
+ "strings"
+
+ "github.com/spiral/errors"
+)
+
+type wildcard struct {
+ prefix string
+ suffix string
+}
+
+func newWildcard(pattern string) (*wildcard, error) {
+ // Normalize
+ origin := strings.ToLower(pattern)
+ i := strings.IndexByte(origin, '*')
+
+ /*
+ http.*
+ *
+ *.WorkerError
+ */
+ if i == -1 {
+ dotI := strings.IndexByte(pattern, '.')
+
+ if dotI == -1 {
+ // http.SuperEvent
+ return nil, errors.Str("wrong wildcard, no * or .")
+ }
+
+ return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil
+ }
+
+ // pref: http.
+ // suff: *
+ return &wildcard{origin[0:i], origin[i+1:]}, nil
+}
+
+func (w wildcard) match(s string) bool {
+ s = strings.ToLower(s)
+ return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix)
+}
diff --git a/events/wildcard_test.go b/events/wildcard_test.go
new file mode 100644
index 00000000..230ef673
--- /dev/null
+++ b/events/wildcard_test.go
@@ -0,0 +1,48 @@
+package events
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestWildcard(t *testing.T) {
+ w, err := newWildcard("http.*")
+ assert.NoError(t, err)
+ assert.True(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.True(t, w.match("http.****"))
+
+ // *.* -> *
+ w, err = newWildcard("*")
+ assert.NoError(t, err)
+ assert.True(t, w.match("http.SuperEvent"))
+ assert.True(t, w.match("https.SuperEvent"))
+ assert.True(t, w.match(""))
+ assert.True(t, w.match("*"))
+ assert.True(t, w.match("****"))
+ assert.True(t, w.match("http.****"))
+
+ w, err = newWildcard("*.WorkerError")
+ assert.NoError(t, err)
+ assert.False(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.False(t, w.match("http.****"))
+ assert.True(t, w.match("http.WorkerError"))
+
+ w, err = newWildcard("http.WorkerError")
+ assert.NoError(t, err)
+ assert.False(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.False(t, w.match("http.****"))
+ assert.True(t, w.match("http.WorkerError"))
+}
diff --git a/events/worker_events.go b/events/worker_events.go
deleted file mode 100644
index 6b80df61..00000000
--- a/events/worker_events.go
+++ /dev/null
@@ -1,40 +0,0 @@
-package events
-
-const (
- // EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError W = iota + 11000
- // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
- EventWorkerLog
- // EventWorkerStderr is the worker standard error output
- EventWorkerStderr
- // EventWorkerWaitExit is the worker exit event
- EventWorkerWaitExit
-)
-
-type W int64
-
-func (ev W) String() string {
- switch ev {
- case EventWorkerError:
- return "EventWorkerError"
- case EventWorkerLog:
- return "EventWorkerLog"
- case EventWorkerStderr:
- return "EventWorkerStderr"
- case EventWorkerWaitExit:
- return "EventWorkerWaitExit"
- }
- return UnknownEventType
-}
-
-// WorkerEvent wraps worker events.
-type WorkerEvent struct {
- // Event id, see below.
- Event W
-
- // Worker triggered the event.
- Worker interface{}
-
- // Event specific payload.
- Payload interface{}
-}
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 91bd1c2c..27db830c 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -2,6 +2,7 @@ package pool
import (
"context"
+ "fmt"
"os/exec"
"time"
@@ -14,8 +15,12 @@ import (
workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher"
)
-// StopRequest can be sent by worker to indicate that restart is required.
-const StopRequest = "{\"stop\":true}"
+const (
+ // StopRequest can be sent by worker to indicate that restart is required.
+ StopRequest = `{"stop":true}`
+ // pluginName ...
+ pluginName = "pool"
+)
// ErrorEncoder encode error or make a decision based on the error type
type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error)
@@ -34,11 +39,8 @@ type StaticPool struct {
// creates and connects to stack
factory transport.Factory
- // distributes the events
- events events.Handler
-
- // saved list of event listeners
- listeners []events.Listener
+ events events.EventBus
+ eventsID string
// manages worker states and TTLs
ww Watcher
@@ -62,11 +64,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
cfg.MaxJobs = 1
}
+ eb, id := events.Bus()
p := &StaticPool{
- cfg: cfg,
- cmd: cmd,
- factory: factory,
- events: events.NewEventsHandler(),
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
+ events: eb,
+ eventsID: id,
}
// add pool options
@@ -77,7 +81,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
// set up workers watcher
- p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout)
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
@@ -95,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
- sp := supervisorWrapper(p, p.events, p.cfg.Supervisor)
+ sp := supervisorWrapper(p, p.cfg.Supervisor)
// start watcher timer
sp.Start()
return sp, nil
@@ -104,20 +108,6 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
return p, nil
}
-func AddListeners(listeners ...events.Listener) Options {
- return func(p *StaticPool) {
- p.listeners = listeners
- for i := 0; i < len(listeners); i++ {
- p.addListener(listeners[i])
- }
- }
-}
-
-// AddListener connects event listener to the pool.
-func (sp *StaticPool) addListener(listener events.Listener) {
- sp.events.AddListener(listener)
-}
-
// GetConfig returns associated pool configuration. Immutable.
func (sp *StaticPool) GetConfig() interface{} {
return sp.cfg
@@ -205,7 +195,11 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: pluginName,
+ M: fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()),
+ })
}
}
@@ -227,7 +221,11 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Error: errors.E(op, err)})
+ sp.events.Send(&events.RREvent{
+ T: events.EventNoFreeWorkers,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s", err),
+ })
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -238,6 +236,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.events.Unsubscribe(sp.eventsID)
sp.ww.Destroy(ctx)
}
@@ -246,12 +245,20 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventExecTTL,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s", err),
+ })
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
+ })
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -272,7 +279,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
+ })
// kill the worker instead of sending net packet to it
_ = w.Kill()
@@ -280,7 +291,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return nil, err
default:
w.State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerDestruct,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
+ })
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
@@ -296,7 +311,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return func() (worker.SyncWorker, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
- w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...)
+ w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd())
if err != nil {
return nil, err
}
@@ -304,9 +319,10 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// wrap sync worker
sw := worker.From(w)
- sp.events.Push(events.PoolEvent{
- Event: events.EventWorkerConstruct,
- Payload: sw,
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerConstruct,
+ P: pluginName,
+ M: fmt.Sprintf("pid: %d", sw.Pid()),
})
return sw, nil
}
@@ -329,7 +345,11 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
sw.State().Set(worker.StateDestroyed)
err = sw.Kill()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()),
+ })
return nil, err
}
@@ -346,7 +366,11 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// redirect call to the worker with TTL
r, err := sw.ExecWithTTL(ctx, p)
if stopErr := sw.Stop(); stopErr != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ sp.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: pluginName,
+ M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()),
+ })
}
return r, err
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index 9861f0d8..abef3779 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -18,6 +18,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
var cfg = &Config{
@@ -167,26 +168,17 @@ func Test_StaticPool_JobError(t *testing.T) {
func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
- block := make(chan struct{}, 10)
-
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerStderr {
- e := string(wev.Payload.([]byte))
- if strings.ContainsAny(e, "undefined_function()") {
- block <- struct{}{}
- return
- }
- }
- }
- }
+
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") },
pipe.NewPipeFactory(),
cfg,
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -196,22 +188,22 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+ event := <-ch
+ if !strings.Contains(event.Message(), "undefined_function()") {
+ t.Fatal("event should contain undefiled function()")
+ }
p.Destroy(ctx)
}
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
+
// Run pool events
- ev := make(chan struct{}, 1)
- listener := func(event interface{}) {
- if pe, ok := event.(events.PoolEvent); ok {
- if pe.Event == events.EventWorkerConstruct {
- ev <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch)
+ require.NoError(t, err)
var cfg2 = &Config{
NumWorkers: 1,
@@ -224,7 +216,6 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
cfg2,
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -242,7 +233,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Equal(t, 1, len(p.Workers()))
// first creation
- <-ev
+ <-ch
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill()
if err != nil {
@@ -250,7 +241,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
// re-creation
- <-ev
+ <-ch
list := p.Workers()
for _, w := range list {
@@ -496,15 +487,11 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx := context.Background()
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventNoFreeWorkers {
- block <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch)
+ require.NoError(t, err)
p, err := Initialize(
ctx,
@@ -518,7 +505,6 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
DestroyTimeout: time.Second,
Supervisor: nil,
},
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -532,7 +518,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+ <-ch
p.Destroy(ctx)
}
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 99af168c..c1fb6eec 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -2,6 +2,7 @@ package pool
import (
"context"
+ "fmt"
"sync"
"time"
@@ -12,7 +13,10 @@ import (
"github.com/spiral/roadrunner/v2/worker"
)
-const MB = 1024 * 1024
+const (
+ MB = 1024 * 1024
+ supervisorName string = "supervisor"
+)
// NSEC_IN_SEC nanoseconds in second
const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
@@ -24,20 +28,23 @@ type Supervised interface {
}
type supervised struct {
- cfg *SupervisorConfig
- events events.Handler
- pool Pool
- stopCh chan struct{}
- mu *sync.RWMutex
+ cfg *SupervisorConfig
+ events events.EventBus
+ eventsID string
+ pool Pool
+ stopCh chan struct{}
+ mu *sync.RWMutex
}
-func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised {
+ eb, id := events.Bus()
sp := &supervised{
- cfg: cfg,
- events: events,
- pool: pool,
- mu: &sync.RWMutex{},
- stopCh: make(chan struct{}),
+ cfg: cfg,
+ events: eb,
+ eventsID: id,
+ pool: pool,
+ mu: &sync.RWMutex{},
+ stopCh: make(chan struct{}),
}
return sp
@@ -148,7 +155,11 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
+ sp.events.Send(&events.RREvent{
+ T: events.EventTTL,
+ P: supervisorName,
+ M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
+ })
continue
}
@@ -168,7 +179,11 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
+ sp.events.Send(&events.RREvent{
+ T: events.EventMaxMemory,
+ P: supervisorName,
+ M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
+ })
continue
}
@@ -223,7 +238,11 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double-check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
+ sp.events.Send(&events.RREvent{
+ T: events.EventIdleTTL,
+ P: supervisorName,
+ M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
+ })
}
}
}
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index aca379c6..c3abf85e 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -326,14 +326,10 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
},
}
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventMaxMemory {
- block <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch)
+ require.NoError(t, err)
// constructed
// max memory
@@ -344,7 +340,6 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
pipe.NewPipeFactory(),
cfgExecTTL,
- AddListeners(listener),
)
assert.NoError(t, err)
@@ -359,7 +354,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
assert.Empty(t, resp.Body)
assert.Empty(t, resp.Context)
- <-block
+ <-ch
p.Destroy(context.Background())
}
diff --git a/tests/worker-ok.php b/tests/psr-worker-post.php
index 63558b0f..2f54af5b 100644
--- a/tests/worker-ok.php
+++ b/tests/psr-worker-post.php
@@ -1,14 +1,16 @@
<?php
+
/**
* @var Goridge\RelayInterface $relay
*/
+
use Spiral\Goridge;
use Spiral\RoadRunner;
ini_set('display_errors', 'stderr');
require __DIR__ . "/vendor/autoload.php";
-$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$worker = RoadRunner\Worker::create();
$psr7 = new RoadRunner\Http\PSR7Worker(
$worker,
new \Nyholm\Psr7\Factory\Psr17Factory(),
@@ -19,7 +21,8 @@ $psr7 = new RoadRunner\Http\PSR7Worker(
while ($req = $psr7->waitRequest()) {
try {
$resp = new \Nyholm\Psr7\Response();
- $resp->getBody()->write($_SERVER['RR_BROADCAST_PATH'] ?? '');
+ $resp->getBody()->write((string) $req->getBody());
+
$psr7->respond($resp);
} catch (\Throwable $e) {
$psr7->getWorker()->error((string)$e);
diff --git a/tests/temporal-worker.php b/tests/temporal-worker.php
deleted file mode 100644
index 5c9c80e6..00000000
--- a/tests/temporal-worker.php
+++ /dev/null
@@ -1,34 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-require __DIR__ . '/vendor/autoload.php';
-
-/**
- * @param string $dir
- * @return array<string>
- */
-$getClasses = static function (string $dir): iterable {
- $files = glob($dir . '/*.php');
-
- foreach ($files as $file) {
- yield substr(basename($file), 0, -4);
- }
-};
-
-$factory = \Temporal\WorkerFactory::create();
-
-$worker = $factory->newWorker('default');
-
-// register all workflows
-foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) {
- $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name);
-}
-
-// register all activity
-foreach ($getClasses(__DIR__ . '/src/Activity') as $name) {
- $class = 'Temporal\\Tests\\Activity\\' . $name;
- $worker->registerActivityImplementations(new $class);
-}
-
-$factory->run();
diff --git a/tests/worker-cors.php b/tests/worker-cors.php
deleted file mode 100644
index ea3c986c..00000000
--- a/tests/worker-cors.php
+++ /dev/null
@@ -1,15 +0,0 @@
-<?php
-
-use Spiral\RoadRunner\Worker;
-use Spiral\RoadRunner\Http\HttpWorker;
-
-ini_set('display_errors', 'stderr');
-require __DIR__ . '/vendor/autoload.php';
-
-$http = new HttpWorker(Worker::create());
-
-while ($req = $http->waitRequest()) {
- $http->respond(200, 'Response', [
- 'Access-Control-Allow-Origin' => ['*']
- ]);
-}
diff --git a/tests/worker-deny.php b/tests/worker-deny.php
deleted file mode 100644
index 6dc993f6..00000000
--- a/tests/worker-deny.php
+++ /dev/null
@@ -1,30 +0,0 @@
-<?php
-/**
- * @var Goridge\RelayInterface $relay
- */
-use Spiral\Goridge;
-use Spiral\RoadRunner;
-
-ini_set('display_errors', 'stderr');
-require __DIR__ . "/vendor/autoload.php";
-
-$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
-$psr7 = new RoadRunner\Http\PSR7Worker(
- $worker,
- new \Nyholm\Psr7\Factory\Psr17Factory(),
- new \Nyholm\Psr7\Factory\Psr17Factory(),
- new \Nyholm\Psr7\Factory\Psr17Factory()
-);
-
-while ($req = $psr7->waitRequest()) {
- try {
- $resp = new \Nyholm\Psr7\Response();
- if ($req->getAttribute('ws:joinServer')) {
- $psr7->respond($resp->withStatus(200));
- } else {
- $psr7->respond($resp->withStatus(401));
- }
- } catch (\Throwable $e) {
- $psr7->getWorker()->error((string)$e);
- }
-}
diff --git a/tests/worker-origin.php b/tests/worker-origin.php
deleted file mode 100644
index 6ce4de59..00000000
--- a/tests/worker-origin.php
+++ /dev/null
@@ -1,14 +0,0 @@
-<?php
-
-use Spiral\RoadRunner\Worker;
-use Spiral\RoadRunner\Http\HttpWorker;
-
-require __DIR__ . '/vendor/autoload.php';
-
-$http = new HttpWorker(Worker::create());
-
-while ($req = $http->waitRequest()) {
- $http->respond(200, 'Response', [
- 'Access-Control-Allow-Origin' => ['*']
- ]);
-}
diff --git a/tests/worker-stop.php b/tests/worker-stop.php
deleted file mode 100644
index 83fc5710..00000000
--- a/tests/worker-stop.php
+++ /dev/null
@@ -1,26 +0,0 @@
-<?php
-/**
- * @var Goridge\RelayInterface $relay
- */
-use Spiral\Goridge;
-use Spiral\RoadRunner;
-
-ini_set('display_errors', 'stderr');
-require __DIR__ . "/vendor/autoload.php";
-
-$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
-$psr7 = new RoadRunner\Http\PSR7Worker(
- $worker,
- new \Nyholm\Psr7\Factory\Psr17Factory(),
- new \Nyholm\Psr7\Factory\Psr17Factory(),
- new \Nyholm\Psr7\Factory\Psr17Factory()
-);
-
-while ($req = $psr7->waitRequest()) {
- try {
- $resp = new \Nyholm\Psr7\Response();
- $psr7->respond($resp->withAddedHeader('stop', 'we-dont-like-you')->withStatus(401));
- } catch (\Throwable $e) {
- $psr7->getWorker()->error((string)$e);
- }
-}
diff --git a/transport/interface.go b/transport/interface.go
index e20f2b0b..0d6c8e8b 100644
--- a/transport/interface.go
+++ b/transport/interface.go
@@ -4,7 +4,6 @@ import (
"context"
"os/exec"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/worker"
)
@@ -12,10 +11,10 @@ import (
type Factory interface {
// SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context.
// Process must not be started.
- SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error)
+ SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (*worker.Process, error)
// SpawnWorker creates new WorkerProcess process based on given command.
// Process must not be started.
- SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error)
+ SpawnWorker(*exec.Cmd) (*worker.Process, error)
// Close the factory and underlying connections.
Close() error
}
diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go
index 3ea8fd98..c70b3f65 100755
--- a/transport/pipe/pipe_factory.go
+++ b/transport/pipe/pipe_factory.go
@@ -5,7 +5,6 @@ import (
"os/exec"
"github.com/spiral/goridge/v3/pkg/pipe"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
)
@@ -27,10 +26,10 @@ type sr struct {
// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) {
spCh := make(chan sr)
go func() {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
select {
case spCh <- sr{
@@ -130,8 +129,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) {
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
return nil, err
}
diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go
index 45b7aef8..81004027 100644
--- a/transport/pipe/pipe_factory_spawn_test.go
+++ b/transport/pipe/pipe_factory_spawn_test.go
@@ -12,6 +12,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_GetState2(t *testing.T) {
@@ -105,21 +106,20 @@ func Test_Pipe_PipeError4(t *testing.T) {
func Test_Pipe_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
- w, err := NewPipeFactory().SpawnWorker(cmd, listener)
+
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Pipe_Invalid2(t *testing.T) {
@@ -368,17 +368,13 @@ func Test_Echo_Slow2(t *testing.T) {
func Test_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- data := ""
- mu := &sync.Mutex{}
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- mu.Lock()
- data = string(wev.Payload.([]byte))
- mu.Unlock()
- }
- }
- w, err := NewPipeFactory().SpawnWorker(cmd, listener)
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -390,11 +386,11 @@ func Test_Broken2(t *testing.T) {
assert.Nil(t, res)
time.Sleep(time.Second * 3)
- mu.Lock()
- if strings.ContainsAny(data, "undefined_function()") == false {
+
+ msg := <-ch
+ if strings.ContainsAny(msg.Message(), "undefined_function()") == false {
t.Fail()
}
- mu.Unlock()
assert.Error(t, w.Stop())
}
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index b4ba8c87..8c6d440a 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_GetState(t *testing.T) {
@@ -125,22 +126,20 @@ func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
ctx := context.Background()
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Pipe_Invalid(t *testing.T) {
@@ -433,17 +432,13 @@ func Test_Broken(t *testing.T) {
t.Parallel()
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- data := ""
- mu := &sync.Mutex{}
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- mu.Lock()
- data = string(wev.Payload.([]byte))
- mu.Unlock()
- }
- }
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -455,11 +450,10 @@ func Test_Broken(t *testing.T) {
assert.Nil(t, res)
time.Sleep(time.Second * 3)
- mu.Lock()
- if strings.ContainsAny(data, "undefined_function()") == false {
+ msg := <-ch
+ if strings.ContainsAny(msg.Message(), "undefined_function()") == false {
t.Fail()
}
- mu.Unlock()
assert.Error(t, w.Stop())
}
diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go
index dfffdf4e..06d7000d 100755
--- a/transport/socket/socket_factory.go
+++ b/transport/socket/socket_factory.go
@@ -12,7 +12,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/goridge/v3/pkg/socket"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
@@ -83,12 +82,12 @@ type socketSpawn struct {
}
// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) {
c := make(chan socketSpawn)
go func() {
ctxT, cancel := context.WithTimeout(ctx, f.tout)
defer cancel()
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
select {
case c <- socketSpawn{
@@ -157,8 +156,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) {
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
return nil, err
}
diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go
index 363a3510..45fb3bd5 100644
--- a/transport/socket/socket_factory_spawn_test.go
+++ b/transport/socket/socket_factory_spawn_test.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_Tcp_Start2(t *testing.T) {
@@ -110,21 +111,19 @@ func Test_Tcp_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err2)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Tcp_Invalid2(t *testing.T) {
@@ -162,18 +161,12 @@ func Test_Tcp_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -198,7 +191,11 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
wg.Wait()
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function() string")
+ }
}
func Test_Tcp_Echo2(t *testing.T) {
@@ -273,21 +270,19 @@ func Test_Unix_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Unix_Timeout2(t *testing.T) {
@@ -331,18 +326,12 @@ func Test_Unix_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -367,7 +356,11 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
wg.Wait()
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function string")
+ }
}
func Test_Unix_Echo2(t *testing.T) {
diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go
index d517d026..11b34999 100755
--- a/transport/socket/socket_factory_test.go
+++ b/transport/socket/socket_factory_test.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_Tcp_Start(t *testing.T) {
@@ -124,21 +125,19 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err2)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Tcp_Timeout(t *testing.T) {
@@ -203,18 +202,12 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -239,7 +232,11 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
wg.Wait()
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function string")
+ }
}
func Test_Tcp_Echo(t *testing.T) {
@@ -368,21 +365,19 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Unix_Timeout(t *testing.T) {
@@ -444,20 +439,12 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerStderr {
- e := string(wev.Payload.([]byte))
- if strings.ContainsAny(e, "undefined_function()") {
- block <- struct{}{}
- return
- }
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -481,7 +468,12 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function string")
+ }
+
wg.Wait()
}
diff --git a/worker/worker.go b/worker/worker.go
index 38a1e9ac..5973adc6 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -12,18 +12,24 @@ import (
"github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/utils"
"go.uber.org/multierr"
)
type Options func(p *Process)
+const (
+ workerEventsName string = "worker"
+)
+
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
created time.Time
// updates parent supervisor or pool about Process events
- events events.Handler
+ events events.EventBus
+ eventsID string
// state holds information about current Process state,
// number of Process executions, buf status change time.
@@ -49,11 +55,14 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
+
+ eb, id := events.Bus()
w := &Process{
- created: time.Now(),
- events: events.NewEventsHandler(),
- cmd: cmd,
- state: NewWorkerState(StateInactive),
+ created: time.Now(),
+ events: eb,
+ eventsID: id,
+ cmd: cmd,
+ state: NewWorkerState(StateInactive),
}
// set self as stderr implementation (Writer interface)
@@ -67,14 +76,6 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
return w, nil
}
-func AddListeners(listeners ...events.Listener) Options {
- return func(p *Process) {
- for i := 0; i < len(listeners); i++ {
- p.addListener(listeners[i])
- }
- }
-}
-
// Pid returns worker pid.
func (w *Process) Pid() int64 {
return int64(w.pid)
@@ -85,11 +86,6 @@ func (w *Process) Created() time.Time {
return w.created
}
-// AddListener registers new worker event listener.
-func (w *Process) addListener(listener events.Listener) {
- w.events.AddListener(listener)
-}
-
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
func (w *Process) State() State {
@@ -166,6 +162,8 @@ func (w *Process) Wait() error {
return nil
}
+ w.events.Unsubscribe(w.eventsID)
+
return err
}
@@ -187,9 +185,13 @@ func (w *Process) Stop() error {
if err != nil {
w.state.Set(StateKilling)
_ = w.cmd.Process.Signal(os.Kill)
+
+ w.events.Unsubscribe(w.eventsID)
return errors.E(op, errors.Network, err)
}
+
w.state.Set(StateStopped)
+ w.events.Unsubscribe(w.eventsID)
return nil
}
@@ -201,6 +203,8 @@ func (w *Process) Kill() error {
if err != nil {
return err
}
+
+ w.events.Unsubscribe(w.eventsID)
return nil
}
@@ -210,11 +214,18 @@ func (w *Process) Kill() error {
return err
}
w.state.Set(StateStopped)
+
+ w.events.Unsubscribe(w.eventsID)
return nil
}
// Worker stderr
func (w *Process) Write(p []byte) (n int, err error) {
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p})
+ w.events.Send(&events.RREvent{
+ T: events.EventWorkerStderr,
+ P: workerEventsName,
+ M: utils.AsString(p),
+ })
+
return len(p), nil
}
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index 175972e0..871e6146 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -2,6 +2,7 @@ package worker_watcher //nolint:stylecheck
import (
"context"
+ "fmt"
"sync"
"sync/atomic"
"time"
@@ -13,6 +14,10 @@ import (
"github.com/spiral/roadrunner/v2/worker_watcher/container/channel"
)
+const (
+ wwName string = "worker_watcher"
+)
+
// Vector interface represents vector container
type Vector interface {
// Push used to put worker to the vector
@@ -34,25 +39,28 @@ type workerWatcher struct {
// used to control Destroy stage (that all workers are in the container)
numWorkers *uint64
- workers []worker.BaseProcess
+ workers []worker.BaseProcess
+ events events.EventBus
+ eventsID string
allocator worker.Allocator
allocateTimeout time.Duration
- events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher {
+ eb, id := events.Bus()
ww := &workerWatcher{
container: channel.NewVector(numWorkers),
+ events: eb,
+ eventsID: id,
// pass a ptr to the number of workers to avoid blocking in the TTL loop
numWorkers: utils.Uint64(numWorkers),
allocateTimeout: allocateTimeout,
workers: make([]worker.BaseProcess, 0, numWorkers),
allocator: allocator,
- events: events,
}
return ww
@@ -140,11 +148,11 @@ func (ww *workerWatcher) Allocate() error {
sw, err := ww.allocator()
if err != nil {
// log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
- })
+ ww.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: wwName,
+ M: fmt.Sprintf("can't allocate the worker: %v", err),
+ })
// if no timeout, return error immediately
if ww.allocateTimeout == 0 {
@@ -168,11 +176,11 @@ func (ww *workerWatcher) Allocate() error {
sw, err = ww.allocator()
if err != nil {
// log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
- })
+ ww.events.Send(&events.RREvent{
+ T: events.EventWorkerError,
+ P: wwName,
+ M: fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err),
+ })
continue
}
@@ -234,6 +242,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) {
ww.container.Destroy()
ww.Unlock()
+ ww.events.Unsubscribe(ww.eventsID)
tt := time.NewTicker(time.Millisecond * 100)
defer tt.Stop()
for { //nolint:gosimple
@@ -278,9 +287,10 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
- ww.events.Push(events.WorkerEvent{
- Event: events.EventWorkerWaitExit,
- Payload: err,
+ ww.events.Send(&events.RREvent{
+ T: events.EventWorkerWaitExit,
+ P: wwName,
+ M: fmt.Sprintf("error: %v", err),
})
}
@@ -289,7 +299,12 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
- ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ ww.events.Send(&events.RREvent{
+ T: events.EventWorkerDestruct,
+ P: wwName,
+ M: fmt.Sprintf("pid: %d", w.Pid()),
+ })
+
return
}
@@ -298,9 +313,10 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
err = ww.Allocate()
if err != nil {
- ww.events.Push(events.PoolEvent{
- Event: events.EventWorkerProcessExit,
- Error: errors.E(op, err),
+ ww.events.Send(&events.RREvent{
+ T: events.EventWorkerProcessExit,
+ P: wwName,
+ M: fmt.Sprintf("error: %v", err),
})
// no workers at all, panic