diff options
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/events/events.go | 23 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory.go | 4 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 6 | ||||
-rwxr-xr-x | pkg/socket/socket_factory.go | 4 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 4 |
5 files changed, 22 insertions, 19 deletions
diff --git a/pkg/events/events.go b/pkg/events/events.go index 92dc103a..226a0c91 100755 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -6,32 +6,35 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/events" ) -// EventHandler helps to broadcast events to multiple listeners. -type EventHandler struct { - listeners []events.EventListener - sync.RWMutex +// HandlerImpl helps to broadcast events to multiple listeners. +type HandlerImpl struct { + listeners []events.Listener + sync.RWMutex // all receivers should be pointers } func NewEventsHandler() events.Handler { - return &EventHandler{listeners: make([]events.EventListener, 0, 2)} + return &HandlerImpl{listeners: make([]events.Listener, 0, 2)} } // NumListeners returns number of event listeners. -func (eb *EventHandler) NumListeners() int { +func (eb *HandlerImpl) NumListeners() int { + eb.Lock() + defer eb.Unlock() return len(eb.listeners) } // AddListener registers new event listener. -func (eb *EventHandler) AddListener(listener events.EventListener) { +func (eb *HandlerImpl) AddListener(listener events.Listener) { eb.Lock() defer eb.Unlock() eb.listeners = append(eb.listeners, listener) } // Push broadcast events across all event listeners. -func (eb *EventHandler) Push(e interface{}) { - eb.Lock() - defer eb.Unlock() +func (eb *HandlerImpl) Push(e interface{}) { + // ReadLock here because we are not changing listeners + eb.RLock() + defer eb.RUnlock() for k := range eb.listeners { eb.listeners[k](e) } diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index ecb3fa71..c36c13e2 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -30,7 +30,7 @@ type SpawnResult struct { // SpawnWorker creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { c := make(chan SpawnResult) const op = errors.Op("spawn worker with context") go func() { @@ -113,7 +113,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { const op = errors.Op("spawn worker") w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 23bb2d5f..808e7d35 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -40,7 +40,7 @@ type StaticPool struct { events events.Handler // saved list of event listeners - listeners []events.EventListener + listeners []events.Listener // manages worker states and TTLs ww worker.Watcher @@ -104,7 +104,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co return p, nil } -func AddListeners(listeners ...events.EventListener) Options { +func AddListeners(listeners ...events.Listener) Options { return func(p *StaticPool) { p.listeners = listeners for i := 0; i < len(listeners); i++ { @@ -114,7 +114,7 @@ func AddListeners(listeners ...events.EventListener) Options { } // AddListener connects event listener to the pool. -func (sp *StaticPool) addListener(listener events.EventListener) { +func (sp *StaticPool) addListener(listener events.Listener) { sp.events.AddListener(listener) } diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index 38b3e7c9..ff882389 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -85,7 +85,7 @@ type socketSpawn struct { } // SpawnWorker creates Process and connects it to appropriate relay or returns error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { const op = errors.Op("spawn_worker_with_context") c := make(chan socketSpawn) go func() { @@ -145,7 +145,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { const op = errors.Op("spawn_worker") w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index db182a3e..493882a8 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -117,7 +117,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, erro return w, nil } -func AddListeners(listeners ...events.EventListener) Options { +func AddListeners(listeners ...events.Listener) Options { return func(p *Process) { for i := 0; i < len(listeners); i++ { p.addListener(listeners[i]) @@ -136,7 +136,7 @@ func (w *Process) Created() time.Time { } // AddListener registers new worker event listener. -func (w *Process) addListener(listener events.EventListener) { +func (w *Process) addListener(listener events.Listener) { w.events.AddListener(listener) } |