summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-26 13:13:59 +0300
committerValery Piashchynski <[email protected]>2020-12-26 13:13:59 +0300
commit1aaf6e6ffb015cd5a21d9d938ad84c18723973c5 (patch)
tree7221427ba25f8f99a10720778a420703dc2bff92 /pkg
parentde4d0b6e66ceda08b0daec18a218c1baa71ebf04 (diff)
Reorganize eventsv2.0.0-beta4
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/events/events.go23
-rwxr-xr-xpkg/pipe/pipe_factory.go4
-rwxr-xr-xpkg/pool/static_pool.go6
-rwxr-xr-xpkg/socket/socket_factory.go4
-rwxr-xr-xpkg/worker/worker.go4
5 files changed, 22 insertions, 19 deletions
diff --git a/pkg/events/events.go b/pkg/events/events.go
index 92dc103a..226a0c91 100755
--- a/pkg/events/events.go
+++ b/pkg/events/events.go
@@ -6,32 +6,35 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
)
-// EventHandler helps to broadcast events to multiple listeners.
-type EventHandler struct {
- listeners []events.EventListener
- sync.RWMutex
+// HandlerImpl helps to broadcast events to multiple listeners.
+type HandlerImpl struct {
+ listeners []events.Listener
+ sync.RWMutex // all receivers should be pointers
}
func NewEventsHandler() events.Handler {
- return &EventHandler{listeners: make([]events.EventListener, 0, 2)}
+ return &HandlerImpl{listeners: make([]events.Listener, 0, 2)}
}
// NumListeners returns number of event listeners.
-func (eb *EventHandler) NumListeners() int {
+func (eb *HandlerImpl) NumListeners() int {
+ eb.Lock()
+ defer eb.Unlock()
return len(eb.listeners)
}
// AddListener registers new event listener.
-func (eb *EventHandler) AddListener(listener events.EventListener) {
+func (eb *HandlerImpl) AddListener(listener events.Listener) {
eb.Lock()
defer eb.Unlock()
eb.listeners = append(eb.listeners, listener)
}
// Push broadcast events across all event listeners.
-func (eb *EventHandler) Push(e interface{}) {
- eb.Lock()
- defer eb.Unlock()
+func (eb *HandlerImpl) Push(e interface{}) {
+ // ReadLock here because we are not changing listeners
+ eb.RLock()
+ defer eb.RUnlock()
for k := range eb.listeners {
eb.listeners[k](e)
}
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go
index ecb3fa71..c36c13e2 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/pipe/pipe_factory.go
@@ -30,7 +30,7 @@ type SpawnResult struct {
// SpawnWorker creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
c := make(chan SpawnResult)
const op = errors.Op("spawn worker with context")
go func() {
@@ -113,7 +113,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
const op = errors.Op("spawn worker")
w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 23bb2d5f..808e7d35 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -40,7 +40,7 @@ type StaticPool struct {
events events.Handler
// saved list of event listeners
- listeners []events.EventListener
+ listeners []events.Listener
// manages worker states and TTLs
ww worker.Watcher
@@ -104,7 +104,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
return p, nil
}
-func AddListeners(listeners ...events.EventListener) Options {
+func AddListeners(listeners ...events.Listener) Options {
return func(p *StaticPool) {
p.listeners = listeners
for i := 0; i < len(listeners); i++ {
@@ -114,7 +114,7 @@ func AddListeners(listeners ...events.EventListener) Options {
}
// AddListener connects event listener to the pool.
-func (sp *StaticPool) addListener(listener events.EventListener) {
+func (sp *StaticPool) addListener(listener events.Listener) {
sp.events.AddListener(listener)
}
diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go
index 38b3e7c9..ff882389 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/socket/socket_factory.go
@@ -85,7 +85,7 @@ type socketSpawn struct {
}
// SpawnWorker creates Process and connects it to appropriate relay or returns error
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker_with_context")
c := make(chan socketSpawn)
go func() {
@@ -145,7 +145,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker")
w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index db182a3e..493882a8 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -117,7 +117,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, erro
return w, nil
}
-func AddListeners(listeners ...events.EventListener) Options {
+func AddListeners(listeners ...events.Listener) Options {
return func(p *Process) {
for i := 0; i < len(listeners); i++ {
p.addListener(listeners[i])
@@ -136,7 +136,7 @@ func (w *Process) Created() time.Time {
}
// AddListener registers new worker event listener.
-func (w *Process) addListener(listener events.EventListener) {
+func (w *Process) addListener(listener events.Listener) {
w.events.AddListener(listener)
}