summaryrefslogtreecommitdiff
path: root/events
diff options
context:
space:
mode:
Diffstat (limited to 'events')
-rw-r--r--events/docs/events.md0
-rw-r--r--events/events.go143
-rw-r--r--events/events_test.go94
-rw-r--r--events/eventsbus.go170
-rwxr-xr-xevents/general.go41
-rw-r--r--events/grpc_event.go39
-rw-r--r--events/init.go20
-rw-r--r--events/interface.go14
-rw-r--r--events/jobs_events.go81
-rw-r--r--events/pool_events.go71
-rw-r--r--events/types.go46
-rw-r--r--events/wildcard.go43
-rw-r--r--events/wildcard_test.go48
-rw-r--r--events/worker_events.go40
14 files changed, 564 insertions, 286 deletions
diff --git a/events/docs/events.md b/events/docs/events.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/events/docs/events.md
diff --git a/events/events.go b/events/events.go
new file mode 100644
index 00000000..b7396653
--- /dev/null
+++ b/events/events.go
@@ -0,0 +1,143 @@
+package events
+
+type EventType uint32
+
+const (
+ // EventUnaryCallOk represents success unary call response
+ EventUnaryCallOk EventType = iota
+
+ // EventUnaryCallErr raised when unary call ended with error
+ EventUnaryCallErr
+
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipePaused when pipeline has been paused.
+ EventPipePaused
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventDriverReady thrown when broken is ready to accept/serve tasks.
+ EventDriverReady
+
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventWorkerProcessExit triggered on process wait exit
+ EventWorkerProcessExit
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+
+ // EventPoolRestart triggered when pool restart is needed
+ EventPoolRestart
+
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+ // EventWorkerStderr is the worker standard error output
+ EventWorkerStderr
+ // EventWorkerWaitExit is the worker exit event
+ EventWorkerWaitExit
+)
+
+func (et EventType) String() string {
+ switch et {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventDriverReady:
+ return "EventDriverReady"
+ case EventPipePaused:
+ return "EventPipePaused"
+
+ case EventUnaryCallOk:
+ return "EventUnaryCallOk"
+ case EventUnaryCallErr:
+ return "EventUnaryCallErr"
+
+ case EventWorkerProcessExit:
+ return "EventWorkerProcessExit"
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ case EventPoolRestart:
+ return "EventPoolRestart"
+
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ case EventWorkerStderr:
+ return "EventWorkerStderr"
+ case EventWorkerWaitExit:
+ return "EventWorkerWaitExit"
+
+ default:
+ return "UnknownEventType"
+ }
+}
diff --git a/events/events_test.go b/events/events_test.go
new file mode 100644
index 00000000..e15c55d6
--- /dev/null
+++ b/events/events_test.go
@@ -0,0 +1,94 @@
+package events
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestEvenHandler(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventJobOK", evt.Type().String())
+}
+
+func TestEvenHandler2(t *testing.T) {
+ eh, id := Bus()
+ eh2, id2 := Bus()
+ defer eh.Unsubscribe(id)
+ defer eh2.Unsubscribe(id2)
+
+ ch := make(chan Event, 100)
+ ch2 := make(chan Event, 100)
+ err := eh2.SubscribeP(id2, "http.EventJobOK", ch)
+ require.NoError(t, err)
+
+ err = eh.SubscribeP(id, "http.EventJobOK", ch2)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
+
+ evt := <-ch2
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventJobOK", evt.Type().String())
+
+ l := eh.Len()
+ require.Equal(t, uint(2), l)
+
+ eh.Unsubscribe(id)
+ time.Sleep(time.Second)
+
+ l = eh.Len()
+ require.Equal(t, uint(1), l)
+
+ eh2.Unsubscribe(id2)
+ time.Sleep(time.Second)
+
+ l = eh.Len()
+ require.Equal(t, uint(0), l)
+}
+
+func TestEvenHandler3(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "EventJobOK", ch)
+ require.Error(t, err)
+}
+
+func TestEvenHandler4(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ err := eh.SubscribeP(id, "EventJobOK", nil)
+ require.Error(t, err)
+}
+
+func TestEvenHandler5(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventJobOK", evt.Type().String())
+}
diff --git a/events/eventsbus.go b/events/eventsbus.go
new file mode 100644
index 00000000..cd0dca71
--- /dev/null
+++ b/events/eventsbus.go
@@ -0,0 +1,170 @@
+package events
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+
+ "github.com/spiral/errors"
+)
+
+type sub struct {
+ pattern string
+ w *wildcard
+ events chan<- Event
+}
+
+type eventsBus struct {
+ sync.RWMutex
+ subscribers sync.Map
+ internalEvCh chan Event
+ stop chan struct{}
+}
+
+func newEventsBus() *eventsBus {
+ return &eventsBus{
+ internalEvCh: make(chan Event, 100),
+ stop: make(chan struct{}),
+ }
+}
+
+/*
+http.* <-
+*/
+
+// SubscribeAll for all RR events
+// returns subscriptionID
+func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error {
+ if ch == nil {
+ return errors.Str("nil channel provided")
+ }
+
+ subIDTr := strings.Trim(subID, " ")
+
+ if subIDTr == "" {
+ return errors.Str("subscriberID can't be empty")
+ }
+
+ return eb.subscribe(subID, "*", ch)
+}
+
+// SubscribeP pattern like "pluginName.EventType"
+func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error {
+ if ch == nil {
+ return errors.Str("nil channel provided")
+ }
+
+ subIDTr := strings.Trim(subID, " ")
+ patternTr := strings.Trim(pattern, " ")
+
+ if subIDTr == "" || patternTr == "" {
+ return errors.Str("subscriberID or pattern can't be empty")
+ }
+
+ return eb.subscribe(subID, pattern, ch)
+}
+
+func (eb *eventsBus) Unsubscribe(subID string) {
+ eb.subscribers.Delete(subID)
+}
+
+func (eb *eventsBus) UnsubscribeP(subID, pattern string) {
+ if sb, ok := eb.subscribers.Load(subID); ok {
+ eb.Lock()
+ defer eb.Unlock()
+
+ sbArr := sb.([]*sub)
+
+ for i := 0; i < len(sbArr); i++ {
+ if sbArr[i].pattern == pattern {
+ sbArr[i] = sbArr[len(sbArr)-1]
+ sbArr = sbArr[:len(sbArr)-1]
+ // replace with new array
+ eb.subscribers.Store(subID, sbArr)
+ return
+ }
+ }
+ }
+}
+
+// Send sends event to the events bus
+func (eb *eventsBus) Send(ev Event) {
+ // do not accept nil events
+ if ev == nil {
+ return
+ }
+
+ eb.internalEvCh <- ev
+}
+
+func (eb *eventsBus) Len() uint {
+ var ln uint
+
+ eb.subscribers.Range(func(key, value interface{}) bool {
+ ln++
+ return true
+ })
+
+ return ln
+}
+
+func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error {
+ eb.Lock()
+ defer eb.Unlock()
+ w, err := newWildcard(pattern)
+ if err != nil {
+ return err
+ }
+
+ if sb, ok := eb.subscribers.Load(subID); ok {
+ // at this point we are confident that sb is a []*sub type
+ subArr := sb.([]*sub)
+ subArr = append(subArr, &sub{
+ pattern: pattern,
+ w: w,
+ events: ch,
+ })
+
+ eb.subscribers.Store(subID, subArr)
+
+ return nil
+ }
+
+ subArr := make([]*sub, 0, 5)
+ subArr = append(subArr, &sub{
+ pattern: pattern,
+ w: w,
+ events: ch,
+ })
+
+ eb.subscribers.Store(subID, subArr)
+
+ return nil
+}
+
+func (eb *eventsBus) handleEvents() {
+ for { //nolint:gosimple
+ select {
+ case ev := <-eb.internalEvCh:
+ // http.WorkerError for example
+ wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String())
+
+ eb.subscribers.Range(func(key, value interface{}) bool {
+ vsub := value.([]*sub)
+
+ for i := 0; i < len(vsub); i++ {
+ if vsub[i].w.match(wc) {
+ select {
+ case vsub[i].events <- ev:
+ return true
+ default:
+ return true
+ }
+ }
+ }
+
+ return true
+ })
+ }
+ }
+}
diff --git a/events/general.go b/events/general.go
deleted file mode 100755
index 5cf13e10..00000000
--- a/events/general.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package events
-
-import (
- "sync"
-)
-
-const UnknownEventType string = "Unknown event type"
-
-// HandlerImpl helps to broadcast events to multiple listeners.
-type HandlerImpl struct {
- listeners []Listener
- sync.RWMutex // all receivers should be pointers
-}
-
-func NewEventsHandler() Handler {
- return &HandlerImpl{listeners: make([]Listener, 0, 2)}
-}
-
-// NumListeners returns number of event listeners.
-func (eb *HandlerImpl) NumListeners() int {
- eb.Lock()
- defer eb.Unlock()
- return len(eb.listeners)
-}
-
-// AddListener registers new event listener.
-func (eb *HandlerImpl) AddListener(listener Listener) {
- eb.Lock()
- defer eb.Unlock()
- eb.listeners = append(eb.listeners, listener)
-}
-
-// Push broadcast events across all event listeners.
-func (eb *HandlerImpl) Push(e interface{}) {
- // ReadLock here because we are not changing listeners
- eb.RLock()
- defer eb.RUnlock()
- for k := range eb.listeners {
- eb.listeners[k](e)
- }
-}
diff --git a/events/grpc_event.go b/events/grpc_event.go
deleted file mode 100644
index 31ff4957..00000000
--- a/events/grpc_event.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package events
-
-import (
- "time"
-
- "google.golang.org/grpc"
-)
-
-const (
- // EventUnaryCallOk represents success unary call response
- EventUnaryCallOk G = iota + 13000
-
- // EventUnaryCallErr raised when unary call ended with error
- EventUnaryCallErr
-)
-
-type G int64
-
-func (ev G) String() string {
- switch ev {
- case EventUnaryCallOk:
- return "EventUnaryCallOk"
- case EventUnaryCallErr:
- return "EventUnaryCallErr"
- }
- return UnknownEventType
-}
-
-// JobEvent represent job event.
-type GRPCEvent struct {
- Event G
- // Info contains unary call info.
- Info *grpc.UnaryServerInfo
- // Error associated with event.
- Error error
- // event timings
- Start time.Time
- Elapsed time.Duration
-}
diff --git a/events/init.go b/events/init.go
new file mode 100644
index 00000000..25e92fc5
--- /dev/null
+++ b/events/init.go
@@ -0,0 +1,20 @@
+package events
+
+import (
+ "sync"
+
+ "github.com/google/uuid"
+)
+
+var evBus *eventsBus
+var onInit = &sync.Once{}
+
+func Bus() (*eventsBus, string) {
+ onInit.Do(func() {
+ evBus = newEventsBus()
+ go evBus.handleEvents()
+ })
+
+ // return events bus with subscriberID
+ return evBus, uuid.NewString()
+}
diff --git a/events/interface.go b/events/interface.go
deleted file mode 100644
index 7d57e4d0..00000000
--- a/events/interface.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package events
-
-// Handler interface
-type Handler interface {
- // NumListeners return number of active listeners
- NumListeners() int
- // AddListener adds lister to the publisher
- AddListener(listener Listener)
- // Push pushes event to the listeners
- Push(e interface{})
-}
-
-// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service.
-type Listener func(event interface{})
diff --git a/events/jobs_events.go b/events/jobs_events.go
deleted file mode 100644
index f65ede67..00000000
--- a/events/jobs_events.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package events
-
-import (
- "time"
-)
-
-const (
- // EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK J = iota + 12000
-
- // EventPushError caused when job can not be registered.
- EventPushError
-
- // EventJobStart thrown when new job received.
- EventJobStart
-
- // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
- EventJobOK
-
- // EventJobError thrown on all job related errors. See JobError as context.
- EventJobError
-
- // EventPipeActive when pipeline has started.
- EventPipeActive
-
- // EventPipeStopped when pipeline has been stopped.
- EventPipeStopped
-
- // EventPipePaused when pipeline has been paused.
- EventPipePaused
-
- // EventPipeError when pipeline specific error happen.
- EventPipeError
-
- // EventDriverReady thrown when broken is ready to accept/serve tasks.
- EventDriverReady
-)
-
-type J int64
-
-func (ev J) String() string {
- switch ev {
- case EventPushOK:
- return "EventPushOK"
- case EventPushError:
- return "EventPushError"
- case EventJobStart:
- return "EventJobStart"
- case EventJobOK:
- return "EventJobOK"
- case EventJobError:
- return "EventJobError"
- case EventPipeActive:
- return "EventPipeActive"
- case EventPipeStopped:
- return "EventPipeStopped"
- case EventPipeError:
- return "EventPipeError"
- case EventDriverReady:
- return "EventDriverReady"
- case EventPipePaused:
- return "EventPipePaused"
- }
- return UnknownEventType
-}
-
-// JobEvent represent job event.
-type JobEvent struct {
- Event J
- // String is job id.
- ID string
- // Pipeline name
- Pipeline string
- // Associated driver name (amqp, ephemeral, etc)
- Driver string
- // Error for the jobs/pipes errors
- Error error
- // event timings
- Start time.Time
- Elapsed time.Duration
-}
diff --git a/events/pool_events.go b/events/pool_events.go
deleted file mode 100644
index eb28df6a..00000000
--- a/events/pool_events.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package events
-
-const (
- // EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct P = iota + 10000
-
- // EventWorkerDestruct thrown after worker destruction.
- EventWorkerDestruct
-
- // EventSupervisorError triggered when supervisor can not complete work.
- EventSupervisorError
-
- // EventWorkerProcessExit triggered on process wait exit
- EventWorkerProcessExit
-
- // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
- EventNoFreeWorkers
-
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory
-
- // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
- EventTTL
-
- // EventIdleTTL triggered when worker spends too much time at rest.
- EventIdleTTL
-
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
- EventExecTTL
-
- // EventPoolRestart triggered when pool restart is needed
- EventPoolRestart
-)
-
-type P int64
-
-func (ev P) String() string {
- switch ev {
- case EventWorkerProcessExit:
- return "EventWorkerProcessExit"
- case EventWorkerConstruct:
- return "EventWorkerConstruct"
- case EventWorkerDestruct:
- return "EventWorkerDestruct"
- case EventSupervisorError:
- return "EventSupervisorError"
- case EventNoFreeWorkers:
- return "EventNoFreeWorkers"
- case EventMaxMemory:
- return "EventMaxMemory"
- case EventTTL:
- return "EventTTL"
- case EventIdleTTL:
- return "EventIdleTTL"
- case EventExecTTL:
- return "EventExecTTL"
- case EventPoolRestart:
- return "EventPoolRestart"
- }
- return UnknownEventType
-}
-
-// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
-type PoolEvent struct {
- // Event type, see below.
- Event P
-
- // Payload depends on event type, typically it's worker or error.
- Payload interface{}
- Error error
-}
diff --git a/events/types.go b/events/types.go
new file mode 100644
index 00000000..65a76d15
--- /dev/null
+++ b/events/types.go
@@ -0,0 +1,46 @@
+package events
+
+type EventBus interface {
+ SubscribeAll(subID string, ch chan<- Event) error
+ SubscribeP(subID string, pattern string, ch chan<- Event) error
+ Unsubscribe(subID string)
+ UnsubscribeP(subID, pattern string)
+ Len() uint
+ Send(ev Event)
+}
+
+type Event interface {
+ Plugin() string
+ Type() EventType
+ Message() string
+}
+
+type RREvent struct {
+ // event typ
+ typ EventType
+ // plugin
+ plugin string
+ // message
+ message string
+}
+
+// NewEvent initializes new event
+func NewEvent(t EventType, plugin string, msg string) *RREvent {
+ return &RREvent{
+ typ: t,
+ plugin: plugin,
+ message: msg,
+ }
+}
+
+func (r *RREvent) Type() EventType {
+ return r.typ
+}
+
+func (r *RREvent) Message() string {
+ return r.message
+}
+
+func (r *RREvent) Plugin() string {
+ return r.plugin
+}
diff --git a/events/wildcard.go b/events/wildcard.go
new file mode 100644
index 00000000..b4c28ae1
--- /dev/null
+++ b/events/wildcard.go
@@ -0,0 +1,43 @@
+package events
+
+import (
+ "strings"
+
+ "github.com/spiral/errors"
+)
+
+type wildcard struct {
+ prefix string
+ suffix string
+}
+
+func newWildcard(pattern string) (*wildcard, error) {
+ // Normalize
+ origin := strings.ToLower(pattern)
+ i := strings.IndexByte(origin, '*')
+
+ /*
+ http.*
+ *
+ *.WorkerError
+ */
+ if i == -1 {
+ dotI := strings.IndexByte(pattern, '.')
+
+ if dotI == -1 {
+ // http.SuperEvent
+ return nil, errors.Str("wrong wildcard, no * or . Usage: http.Event or *.Event or http.*")
+ }
+
+ return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil
+ }
+
+ // pref: http.
+ // suff: *
+ return &wildcard{origin[0:i], origin[i+1:]}, nil
+}
+
+func (w wildcard) match(s string) bool {
+ s = strings.ToLower(s)
+ return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix)
+}
diff --git a/events/wildcard_test.go b/events/wildcard_test.go
new file mode 100644
index 00000000..230ef673
--- /dev/null
+++ b/events/wildcard_test.go
@@ -0,0 +1,48 @@
+package events
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestWildcard(t *testing.T) {
+ w, err := newWildcard("http.*")
+ assert.NoError(t, err)
+ assert.True(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.True(t, w.match("http.****"))
+
+ // *.* -> *
+ w, err = newWildcard("*")
+ assert.NoError(t, err)
+ assert.True(t, w.match("http.SuperEvent"))
+ assert.True(t, w.match("https.SuperEvent"))
+ assert.True(t, w.match(""))
+ assert.True(t, w.match("*"))
+ assert.True(t, w.match("****"))
+ assert.True(t, w.match("http.****"))
+
+ w, err = newWildcard("*.WorkerError")
+ assert.NoError(t, err)
+ assert.False(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.False(t, w.match("http.****"))
+ assert.True(t, w.match("http.WorkerError"))
+
+ w, err = newWildcard("http.WorkerError")
+ assert.NoError(t, err)
+ assert.False(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.False(t, w.match("http.****"))
+ assert.True(t, w.match("http.WorkerError"))
+}
diff --git a/events/worker_events.go b/events/worker_events.go
deleted file mode 100644
index 6b80df61..00000000
--- a/events/worker_events.go
+++ /dev/null
@@ -1,40 +0,0 @@
-package events
-
-const (
- // EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError W = iota + 11000
- // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
- EventWorkerLog
- // EventWorkerStderr is the worker standard error output
- EventWorkerStderr
- // EventWorkerWaitExit is the worker exit event
- EventWorkerWaitExit
-)
-
-type W int64
-
-func (ev W) String() string {
- switch ev {
- case EventWorkerError:
- return "EventWorkerError"
- case EventWorkerLog:
- return "EventWorkerLog"
- case EventWorkerStderr:
- return "EventWorkerStderr"
- case EventWorkerWaitExit:
- return "EventWorkerWaitExit"
- }
- return UnknownEventType
-}
-
-// WorkerEvent wraps worker events.
-type WorkerEvent struct {
- // Event id, see below.
- Event W
-
- // Worker triggered the event.
- Worker interface{}
-
- // Event specific payload.
- Payload interface{}
-}