summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-26 13:13:59 +0300
committerValery Piashchynski <[email protected]>2020-12-26 13:13:59 +0300
commit1aaf6e6ffb015cd5a21d9d938ad84c18723973c5 (patch)
tree7221427ba25f8f99a10720778a420703dc2bff92
parentde4d0b6e66ceda08b0daec18a218c1baa71ebf04 (diff)
Reorganize eventsv2.0.0-beta4
-rwxr-xr-xMakefile5
-rw-r--r--interfaces/events/handler.go8
-rw-r--r--interfaces/events/pool_events.go8
-rw-r--r--interfaces/events/worker_events.go9
-rw-r--r--interfaces/worker/factory.go4
-rwxr-xr-xpkg/events/events.go23
-rwxr-xr-xpkg/pipe/pipe_factory.go4
-rwxr-xr-xpkg/pool/static_pool.go6
-rwxr-xr-xpkg/socket/socket_factory.go4
-rwxr-xr-xpkg/worker/worker.go4
-rw-r--r--plugins/http/handler.go6
-rw-r--r--plugins/server/interface.go4
-rw-r--r--plugins/server/plugin.go8
13 files changed, 49 insertions, 44 deletions
diff --git a/Makefile b/Makefile
index 01f7d205..7c0cabbe 100755
--- a/Makefile
+++ b/Makefile
@@ -12,7 +12,7 @@ help: ## Show this help
@awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z0-9_-]+:.*?## / {printf " \033[32m%-14s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST)
build: ## Build RR binary file for local os/arch
- CGO_ENABLED=0 go build -trimpath -ldflags "-s" -o ./rr ./cmd/rr/main.go
+ CGO_ENABLED=0 go build -trimpath -ldflags "-s" -o ./rr ./cmd/main.go
clean: ## Make some clean
rm ./rr
@@ -46,5 +46,4 @@ test: ## Run application tests
go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/static
lint: ## Run application linters
- go fmt ./...
- golint ./...
+ golangci-lint run
diff --git a/interfaces/events/handler.go b/interfaces/events/handler.go
index 01f64d73..ac6c15a4 100644
--- a/interfaces/events/handler.go
+++ b/interfaces/events/handler.go
@@ -1,10 +1,14 @@
package events
+// Handler interface
type Handler interface {
+ // Return number of active listeners
NumListeners() int
- AddListener(listener EventListener)
+ // AddListener adds lister to the publisher
+ AddListener(listener Listener)
+ // Push pushes event to the listeners
Push(e interface{})
}
// Event listener listens for the events produced by worker, worker pool or other service.
-type EventListener func(event interface{})
+type Listener func(event interface{})
diff --git a/interfaces/events/pool_events.go b/interfaces/events/pool_events.go
index cc32f6b2..d1464e1a 100644
--- a/interfaces/events/pool_events.go
+++ b/interfaces/events/pool_events.go
@@ -16,16 +16,16 @@ const (
// EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
EventNoFreeWorkers
- // todo: EventMaxMemory caused when worker consumes more memory than allowed.
+ // EventMaxMemory caused when worker consumes more memory than allowed.
EventMaxMemory
- // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
+ // EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
EventTTL
- // todo: EventIdleTTL triggered when worker spends too much time at rest.
+ // EventIdleTTL triggered when worker spends too much time at rest.
EventIdleTTL
- // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
EventExecTTL
)
diff --git a/interfaces/events/worker_events.go b/interfaces/events/worker_events.go
index 497f0a06..2bff1811 100644
--- a/interfaces/events/worker_events.go
+++ b/interfaces/events/worker_events.go
@@ -1,17 +1,16 @@
package events
-// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
const (
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError E = iota + 200
+ EventWorkerError W = iota + 200
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
EventWorkerLog
)
-type E int64
+type W int64
-func (ev E) String() string {
+func (ev W) String() string {
switch ev {
case EventWorkerError:
return "EventWorkerError"
@@ -24,7 +23,7 @@ func (ev E) String() string {
// WorkerEvent wraps worker events.
type WorkerEvent struct {
// Event id, see below.
- Event E
+ Event W
// Worker triggered the event.
Worker interface{}
diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go
index 8412428d..376303df 100644
--- a/interfaces/worker/factory.go
+++ b/interfaces/worker/factory.go
@@ -11,10 +11,10 @@ import (
type Factory interface {
// SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
// Process must not be started.
- SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.EventListener) (BaseProcess, error)
+ SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (BaseProcess, error)
// SpawnWorker creates new WorkerProcess process based on given command.
// Process must not be started.
- SpawnWorker(*exec.Cmd, ...events.EventListener) (BaseProcess, error)
+ SpawnWorker(*exec.Cmd, ...events.Listener) (BaseProcess, error)
// Close the factory and underlying connections.
Close() error
}
diff --git a/pkg/events/events.go b/pkg/events/events.go
index 92dc103a..226a0c91 100755
--- a/pkg/events/events.go
+++ b/pkg/events/events.go
@@ -6,32 +6,35 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
)
-// EventHandler helps to broadcast events to multiple listeners.
-type EventHandler struct {
- listeners []events.EventListener
- sync.RWMutex
+// HandlerImpl helps to broadcast events to multiple listeners.
+type HandlerImpl struct {
+ listeners []events.Listener
+ sync.RWMutex // all receivers should be pointers
}
func NewEventsHandler() events.Handler {
- return &EventHandler{listeners: make([]events.EventListener, 0, 2)}
+ return &HandlerImpl{listeners: make([]events.Listener, 0, 2)}
}
// NumListeners returns number of event listeners.
-func (eb *EventHandler) NumListeners() int {
+func (eb *HandlerImpl) NumListeners() int {
+ eb.Lock()
+ defer eb.Unlock()
return len(eb.listeners)
}
// AddListener registers new event listener.
-func (eb *EventHandler) AddListener(listener events.EventListener) {
+func (eb *HandlerImpl) AddListener(listener events.Listener) {
eb.Lock()
defer eb.Unlock()
eb.listeners = append(eb.listeners, listener)
}
// Push broadcast events across all event listeners.
-func (eb *EventHandler) Push(e interface{}) {
- eb.Lock()
- defer eb.Unlock()
+func (eb *HandlerImpl) Push(e interface{}) {
+ // ReadLock here because we are not changing listeners
+ eb.RLock()
+ defer eb.RUnlock()
for k := range eb.listeners {
eb.listeners[k](e)
}
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go
index ecb3fa71..c36c13e2 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/pipe/pipe_factory.go
@@ -30,7 +30,7 @@ type SpawnResult struct {
// SpawnWorker creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
c := make(chan SpawnResult)
const op = errors.Op("spawn worker with context")
go func() {
@@ -113,7 +113,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
const op = errors.Op("spawn worker")
w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 23bb2d5f..808e7d35 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -40,7 +40,7 @@ type StaticPool struct {
events events.Handler
// saved list of event listeners
- listeners []events.EventListener
+ listeners []events.Listener
// manages worker states and TTLs
ww worker.Watcher
@@ -104,7 +104,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
return p, nil
}
-func AddListeners(listeners ...events.EventListener) Options {
+func AddListeners(listeners ...events.Listener) Options {
return func(p *StaticPool) {
p.listeners = listeners
for i := 0; i < len(listeners); i++ {
@@ -114,7 +114,7 @@ func AddListeners(listeners ...events.EventListener) Options {
}
// AddListener connects event listener to the pool.
-func (sp *StaticPool) addListener(listener events.EventListener) {
+func (sp *StaticPool) addListener(listener events.Listener) {
sp.events.AddListener(listener)
}
diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go
index 38b3e7c9..ff882389 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/socket/socket_factory.go
@@ -85,7 +85,7 @@ type socketSpawn struct {
}
// SpawnWorker creates Process and connects it to appropriate relay or returns error
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker_with_context")
c := make(chan socketSpawn)
go func() {
@@ -145,7 +145,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker")
w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index db182a3e..493882a8 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -117,7 +117,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, erro
return w, nil
}
-func AddListeners(listeners ...events.EventListener) Options {
+func AddListeners(listeners ...events.Listener) Options {
return func(p *Process) {
for i := 0; i < len(listeners); i++ {
p.addListener(listeners[i])
@@ -136,7 +136,7 @@ func (w *Process) Created() time.Time {
}
// AddListener registers new worker event listener.
-func (w *Process) addListener(listener events.EventListener) {
+func (w *Process) addListener(listener events.Listener) {
w.events.AddListener(listener)
}
diff --git a/plugins/http/handler.go b/plugins/http/handler.go
index 1889ed6d..15954f96 100644
--- a/plugins/http/handler.go
+++ b/plugins/http/handler.go
@@ -26,7 +26,7 @@ const (
const MB = 1024 * 1024
type Handle interface {
- AddListener(l events.EventListener)
+ AddListener(l events.Listener)
ServeHTTP(w http.ResponseWriter, r *http.Request)
}
@@ -75,7 +75,7 @@ type handler struct {
log logger.Logger
pool pool.Pool
mul sync.Mutex
- lsn events.EventListener
+ lsn events.Listener
}
func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) {
@@ -91,7 +91,7 @@ func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool po
}
// Listen attaches handler event controller.
-func (h *handler) AddListener(l events.EventListener) {
+func (h *handler) AddListener(l events.Listener) {
h.mul.Lock()
defer h.mul.Unlock()
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
index 9c1079ea..98345694 100644
--- a/plugins/server/interface.go
+++ b/plugins/server/interface.go
@@ -15,6 +15,6 @@ type Env map[string]string
// Server creates workers for the application.
type Server interface {
CmdFactory(env Env) (func() *exec.Cmd, error)
- NewWorker(ctx context.Context, env Env, listeners ...events.EventListener) (worker.BaseProcess, error)
- NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.EventListener) (pool.Pool, error)
+ NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error)
+ NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index b280d253..5b023bc6 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -104,10 +104,10 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
}
// NewWorker issues new standalone worker.
-func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.EventListener) (worker.BaseProcess, error) {
+func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) {
const op = errors.Op("new worker")
- list := make([]events.EventListener, 0, len(listeners))
+ list := make([]events.Listener, 0, len(listeners))
list = append(list, server.collectWorkerLogs)
spawnCmd, err := server.CmdFactory(env)
@@ -124,14 +124,14 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event
}
// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.EventListener) (pool.Pool, error) {
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
const op = errors.Op("server plugins new worker pool")
spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, errors.E(op, err)
}
- list := make([]events.EventListener, 0, len(listeners))
+ list := make([]events.Listener, 0, len(listeners))
list = append(list, server.collectPoolLogs)
p, err := poolImpl.Initialize(ctx, spawnCmd, server.factory, opt, poolImpl.AddListeners(list...))