summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/build.yml12
-rwxr-xr-xMakefile6
-rw-r--r--interfaces/events/handler.go10
-rw-r--r--interfaces/events/pool_events.go65
-rw-r--r--interfaces/events/worker_events.go34
-rwxr-xr-xinterfaces/factory/factory.go2
-rw-r--r--interfaces/pool/pool.go102
-rw-r--r--interfaces/worker/factory.go18
-rw-r--r--interfaces/worker/watcher.go26
-rw-r--r--interfaces/worker/worker.go62
-rwxr-xr-xpkg/pipe/pipe_factory_test.go4
-rwxr-xr-xpkg/pool/static_pool.go19
-rwxr-xr-xpkg/pool/static_pool_test.go11
-rwxr-xr-xpkg/pool/supervisor_pool.go19
-rwxr-xr-xpkg/worker/sync_worker.go3
-rwxr-xr-xpkg/worker/worker.go9
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go18
-rw-r--r--plugins/http/handler.go8
-rw-r--r--plugins/http/plugin.go11
-rw-r--r--plugins/http/tests/http_test.go4
-rw-r--r--plugins/server/plugin.go11
-rwxr-xr-xutil/events.go12
22 files changed, 401 insertions, 65 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index f69d672a..204173cd 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -65,7 +65,11 @@ jobs:
- name: Run golang tests on Windows without codecov
if: ${{ matrix.os == 'windows-latest' }}
run: |
- go test -v -race -cover -tags=debug .
+ go test -v -race -cover -tags=debug ./pkg/pipe
+ go test -v -race -cover -tags=debug ./pkg/pool
+ go test -v -race -cover -tags=debug ./pkg/socket
+ go test -v -race -cover -tags=debug ./pkg/worker
+ go test -v -race -cover -tags=debug ./pkg/worker_watcher
go test -v -race -cover -tags=debug ./plugins/rpc
go test -v -race -cover -tags=debug ./plugins/rpc/tests
go test -v -race -cover -tags=debug ./plugins/config/tests
@@ -86,7 +90,11 @@ jobs:
if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }}
run: |
mkdir ./coverage-ci
- go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/lib.txt -covermode=atomic .
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/socket
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/worker_watcher.txt -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/rpc_config.txt -covermode=atomic ./plugins/rpc
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./plugins/rpc/tests
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/plugin_config.txt -covermode=atomic ./plugins/config/tests
diff --git a/Makefile b/Makefile
index 487af5c8..9f86fa4a 100755
--- a/Makefile
+++ b/Makefile
@@ -24,7 +24,11 @@ uninstall: ## Uninstall locally installed RR
rm -f /usr/local/bin/rr
test: ## Run application tests
- go test -v -race -cover -tags=debug -covermode=atomic .
+ go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pipe
+ go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pool
+ go test -v -race -cover -tags=debug -covermode=atomic ./pkg/socket
+ go test -v -race -cover -tags=debug -covermode=atomic ./pkg/worker
+ go test -v -race -cover -tags=debug -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc/tests
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/config/tests
diff --git a/interfaces/events/handler.go b/interfaces/events/handler.go
new file mode 100644
index 00000000..01f64d73
--- /dev/null
+++ b/interfaces/events/handler.go
@@ -0,0 +1,10 @@
+package events
+
+type Handler interface {
+ NumListeners() int
+ AddListener(listener EventListener)
+ Push(e interface{})
+}
+
+// Event listener listens for the events produced by worker, worker pool or other service.
+type EventListener func(event interface{})
diff --git a/interfaces/events/pool_events.go b/interfaces/events/pool_events.go
new file mode 100644
index 00000000..cc32f6b2
--- /dev/null
+++ b/interfaces/events/pool_events.go
@@ -0,0 +1,65 @@
+package events
+
+const (
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct P = iota + 7800
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventPoolError caused on pool wide errors.
+ EventPoolError
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // todo: EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
+ EventTTL
+
+ // todo: EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+)
+
+type P int64
+
+func (ev P) String() string {
+ switch ev {
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventPoolError:
+ return "EventPoolError"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ }
+ return "Unknown event type"
+}
+
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event P
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
diff --git a/interfaces/events/worker_events.go b/interfaces/events/worker_events.go
new file mode 100644
index 00000000..497f0a06
--- /dev/null
+++ b/interfaces/events/worker_events.go
@@ -0,0 +1,34 @@
+package events
+
+// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
+const (
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError E = iota + 200
+
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+)
+
+type E int64
+
+func (ev E) String() string {
+ switch ev {
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ }
+ return "Unknown event type"
+}
+
+// WorkerEvent wraps worker events.
+type WorkerEvent struct {
+ // Event id, see below.
+ Event E
+
+ // Worker triggered the event.
+ Worker interface{}
+
+ // Event specific payload.
+ Payload interface{}
+}
diff --git a/interfaces/factory/factory.go b/interfaces/factory/factory.go
index 036ff4e7..51b73501 100755
--- a/interfaces/factory/factory.go
+++ b/interfaces/factory/factory.go
@@ -4,7 +4,7 @@ import (
"context"
"os/exec"
- "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
)
// Factory is responsible of wrapping given command into tasks WorkerProcess.
diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go
index 4eadf064..a1015fd6 100644
--- a/interfaces/pool/pool.go
+++ b/interfaces/pool/pool.go
@@ -1 +1,103 @@
package pool
+
+import (
+ "context"
+ "runtime"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+)
+
+// Pool managed set of inner worker processes.
+type Pool interface {
+ // AddListener connects event listener to the pool.
+ AddListener(listener events.EventListener)
+
+ // GetConfig returns pool configuration.
+ GetConfig() interface{}
+
+ // Exec
+ Exec(rqs internal.Payload) (internal.Payload, error)
+
+ ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error)
+
+ // Workers returns worker list associated with the pool.
+ Workers() (workers []worker.BaseProcess)
+
+ // Remove worker from the pool.
+ RemoveWorker(worker worker.BaseProcess) error
+
+ // Destroy all underlying stack (but let them to complete the task).
+ Destroy(ctx context.Context)
+}
+
+// Configures the pool behaviour.
+type Config struct {
+ // Debug flag creates new fresh worker before every request.
+ Debug bool
+
+ // NumWorkers defines how many sub-processes can be run at once. This value
+ // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
+ NumWorkers int64
+
+ // MaxJobs defines how many executions is allowed for the worker until
+ // it's destruction. set 1 to create new process for each new task, 0 to let
+ // worker handle as many tasks as it can.
+ MaxJobs int64
+
+ // AllocateTimeout defines for how long pool will be waiting for a worker to
+ // be freed to handle the task. Defaults to 60s.
+ AllocateTimeout time.Duration
+
+ // DestroyTimeout defines for how long pool should be waiting for worker to
+ // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
+ DestroyTimeout time.Duration
+
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor *SupervisorConfig
+}
+
+// InitDefaults enables default config values.
+func (cfg *Config) InitDefaults() {
+ if cfg.NumWorkers == 0 {
+ cfg.NumWorkers = int64(runtime.NumCPU())
+ }
+
+ if cfg.AllocateTimeout == 0 {
+ cfg.AllocateTimeout = time.Minute
+ }
+
+ if cfg.DestroyTimeout == 0 {
+ cfg.DestroyTimeout = time.Minute
+ }
+ if cfg.Supervisor == nil {
+ return
+ }
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick uint64
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL uint64
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL uint64
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL uint64
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64
+}
+
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = 1
+ }
+}
diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go
new file mode 100644
index 00000000..19e2bf5d
--- /dev/null
+++ b/interfaces/worker/factory.go
@@ -0,0 +1,18 @@
+package worker
+
+import (
+ "context"
+ "os/exec"
+)
+
+// Factory is responsible of wrapping given command into tasks WorkerProcess.
+type Factory interface {
+ // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
+ // Process must not be started.
+ SpawnWorkerWithContext(context.Context, *exec.Cmd) (BaseProcess, error)
+ // SpawnWorker creates new WorkerProcess process based on given command.
+ // Process must not be started.
+ SpawnWorker(*exec.Cmd) (BaseProcess, error)
+ // Close the factory and underlying connections.
+ Close(ctx context.Context) error
+}
diff --git a/interfaces/worker/watcher.go b/interfaces/worker/watcher.go
new file mode 100644
index 00000000..ce2c1c5a
--- /dev/null
+++ b/interfaces/worker/watcher.go
@@ -0,0 +1,26 @@
+package worker
+
+import "context"
+
+type Watcher interface {
+ // AddToWatch used to add stack to wait its state
+ AddToWatch(workers []BaseProcess) error
+
+ // GetFreeWorker provide first free worker
+ GetFreeWorker(ctx context.Context) (BaseProcess, error)
+
+ // PutWorker enqueues worker back
+ PushWorker(w BaseProcess)
+
+ // AllocateNew used to allocate new worker and put in into the WorkerWatcher
+ AllocateNew() error
+
+ // Destroy destroys the underlying stack
+ Destroy(ctx context.Context)
+
+ // WorkersList return all stack w/o removing it from internal storage
+ WorkersList() []BaseProcess
+
+ // RemoveWorker remove worker from the stack
+ RemoveWorker(wb BaseProcess) error
+}
diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go
new file mode 100644
index 00000000..edbc68d9
--- /dev/null
+++ b/interfaces/worker/worker.go
@@ -0,0 +1,62 @@
+package worker
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/spiral/goridge/v3"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/internal"
+)
+
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (BaseProcess, error)
+
+type BaseProcess interface {
+ fmt.Stringer
+
+ // Pid returns worker pid.
+ Pid() int64
+
+ // Created returns time worker was created at.
+ Created() time.Time
+
+ // AddListener attaches listener to consume worker events.
+ AddListener(listener events.EventListener)
+
+ // State return receive-only WorkerProcess state object, state can be used to safely access
+ // WorkerProcess status, time when status changed and number of WorkerProcess executions.
+ State() internal.State
+
+ // Start used to run Cmd and immediately return
+ Start() error
+
+ // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
+ // complete and will return process error (if any), if stderr is presented it's value
+ // will be wrapped as WorkerError. Method will return error code if php process fails
+ // to find or Start the script.
+ Wait() error
+
+ // Stop sends soft termination command to the WorkerProcess and waits for process completion.
+ Stop(ctx context.Context) error
+
+ // Kill kills underlying process, make sure to call Wait() func to gather
+ // error log from the stderr. Does not waits for process completion!
+ Kill() error
+
+ // Relay returns attached to worker goridge relay
+ Relay() goridge.Relay
+
+ // AttachRelay used to attach goridge relay to the worker process
+ AttachRelay(rl goridge.Relay)
+}
+
+type SyncWorker interface {
+ // BaseProcess provides basic functionality for the SyncWorker
+ BaseProcess
+ // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
+ Exec(rqs internal.Payload) (internal.Payload, error)
+ // ExecWithContext used to handle Exec with TTL
+ ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error)
+}
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 8250d226..99212ff8 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -9,7 +9,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
@@ -413,7 +413,7 @@ func Test_Broken(t *testing.T) {
data := ""
mu := &sync.Mutex{}
w.AddListener(func(event interface{}) {
- if wev, ok := event.(worker.Event); ok {
+ if wev, ok := event.(events.WorkerEvent); ok {
mu.Lock()
data = string(wev.Payload.([]byte))
mu.Unlock()
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 220ea8e9..691290b2 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -5,6 +5,7 @@ import (
"os/exec"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
@@ -40,7 +41,7 @@ type StaticPool struct {
factory worker.Factory
// distributes the events
- events worker.EventsHandler
+ events events.Handler
// manages worker states and TTLs
ww worker.Watcher
@@ -120,7 +121,7 @@ func ExecAfter(after ...After) Options {
}
// AddListener connects event listener to the pool.
-func (sp *StaticPool) AddListener(listener worker.EventListener) {
+func (sp *StaticPool) AddListener(listener events.EventListener) {
sp.events.AddListener(listener)
}
@@ -169,7 +170,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
sw.State().Set(internal.StateInvalid)
err = sw.Stop(bCtx)
if err != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
return sp.Exec(p)
@@ -221,7 +222,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload)
sw.State().Set(internal.StateInvalid)
err = sw.Stop(bCtx)
if err != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
@@ -252,7 +253,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Push(pool.Event{Event: pool.EventNoFreeWorkers, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)})
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -274,13 +275,13 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- sp.events.Push(pool.Event{Event: pool.EventWorkerConstruct, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
}
w.State().Set(internal.StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
sp.ww.PushWorker(w)
@@ -290,7 +291,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
w.State().Set(internal.StateInvalid)
- sp.events.Push(pool.Event{Event: pool.EventWorkerDestruct, Payload: w})
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
if errS != nil {
@@ -325,7 +326,7 @@ func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
r, err := sw.(worker.SyncWorker).Exec(p)
if stopErr := sw.Stop(context.Background()); stopErr != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: err})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
}
return r, err
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 8b13c7c9..0794b8e6 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -12,8 +12,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/pipe"
"github.com/stretchr/testify/assert"
@@ -179,8 +178,8 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
block := make(chan struct{})
p.AddListener(func(event interface{}) {
- if wev, ok := event.(worker.Event); ok {
- if wev.Event == worker.EventWorkerLog {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ if wev.Event == events.EventWorkerLog {
e := string(wev.Payload.([]byte))
if strings.ContainsAny(e, "undefined_function()") {
block <- struct{}{}
@@ -227,8 +226,8 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
p.AddListener(func(event interface{}) {
- if pe, ok := event.(pool.Event); ok {
- if pe.Event == pool.EventWorkerConstruct {
+ if pe, ok := event.(events.PoolEvent); ok {
+ if pe.Event == events.EventWorkerConstruct {
wg.Done()
}
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 0a2d16f7..6d1f0c58 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -7,6 +7,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
@@ -22,13 +23,13 @@ type Supervised interface {
type supervised struct {
cfg *SupervisorConfig
- events worker.EventsHandler
+ events events.Handler
pool pool.Pool
stopCh chan struct{}
mu *sync.RWMutex
}
-func newPoolWatcher(pool pool.Pool, events worker.EventsHandler, cfg *SupervisorConfig) Supervised {
+func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
sp := &supervised{
cfg: cfg,
events: events,
@@ -91,7 +92,7 @@ func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) {
return rsp, nil
}
-func (sp *supervised) AddListener(listener worker.EventListener) {
+func (sp *supervised) AddListener(listener events.EventListener) {
sp.pool.AddListener(listener)
}
@@ -156,20 +157,20 @@ func (sp *supervised) control() {
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
- sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
return
}
- sp.events.Push(pool.Event{Event: pool.EventTTL, Payload: workers[i]})
+ sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
- sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
return
}
- sp.events.Push(pool.Event{Event: pool.EventMaxMemory, Payload: workers[i]})
+ sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
}
@@ -197,10 +198,10 @@ func (sp *supervised) control() {
if sp.cfg.IdleTTL-uint64(res) <= 0 {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
- sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
return
}
- sp.events.Push(pool.Event{Event: pool.EventIdleTTL, Payload: workers[i]})
+ sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
}
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 0fcde2c3..1eb1396e 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
"go.uber.org/multierr"
@@ -193,7 +194,7 @@ func (tw *syncWorker) Created() time.Time {
return tw.w.Created()
}
-func (tw *syncWorker) AddListener(listener worker.EventListener) {
+func (tw *syncWorker) AddListener(listener events.EventListener) {
tw.w.AddListener(listener)
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index d2b4374b..998ed592 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -13,6 +13,7 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/util"
@@ -43,7 +44,7 @@ type Process struct {
created time.Time
// updates parent supervisor or pool about Process events
- events worker.EventsHandler
+ events events.Handler
// state holds information about current Process state,
// number of Process executions, buf status change time.
@@ -119,7 +120,7 @@ func (w *Process) Created() time.Time {
}
// AddListener registers new worker event listener.
-func (w *Process) AddListener(listener worker.EventListener) {
+func (w *Process) AddListener(listener events.EventListener) {
w.events.AddListener(listener)
}
@@ -279,7 +280,7 @@ func (w *Process) watch() {
buf := w.get()
// read the last data
n, _ := w.rd.Read(*buf)
- w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
w.mu.Lock()
// write new message
w.stderr.Write((*buf)[:n])
@@ -290,7 +291,7 @@ func (w *Process) watch() {
// read the max 10kb of stderr per one read
buf := w.get()
n, _ := w.rd.Read(*buf)
- w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
w.mu.Lock()
// write new message
w.stderr.Write((*buf)[:n])
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 530ce5d6..8a71ff8a 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -7,7 +7,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
@@ -140,7 +140,7 @@ func (stack *Stack) Destroy(ctx context.Context) {
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events worker.EventsHandler) worker.Watcher {
+func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher {
ww := &workerWatcher{
stack: NewWorkersStack(),
allocator: allocator,
@@ -158,7 +158,7 @@ type workerWatcher struct {
allocator worker.Allocator
initialNumWorkers int64
actualNumWorkers int64
- events worker.EventsHandler
+ events events.Handler
}
func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
@@ -229,8 +229,8 @@ func (ww *workerWatcher) AllocateNew() error {
ww.stack.mutex.Unlock()
ww.PushWorker(sw)
- ww.events.Push(pool.Event{
- Event: pool.EventWorkerConstruct,
+ ww.events.Push(events.PoolEvent{
+ Event: events.EventWorkerConstruct,
Payload: sw,
})
@@ -279,8 +279,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("process wait")
err := w.Wait()
if err != nil {
- ww.events.Push(worker.Event{
- Event: worker.EventWorkerError,
+ ww.events.Push(events.WorkerEvent{
+ Event: events.EventWorkerError,
Worker: w,
Payload: errors.E(op, err),
})
@@ -294,8 +294,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
_ = ww.stack.FindAndRemoveByPid(w.Pid())
err = ww.AllocateNew()
if err != nil {
- ww.events.Push(pool.Event{
- Event: pool.EventPoolError,
+ ww.events.Push(events.PoolEvent{
+ Event: events.EventPoolError,
Payload: errors.E(op, err),
})
}
diff --git a/plugins/http/handler.go b/plugins/http/handler.go
index 4cc08c41..57590bfd 100644
--- a/plugins/http/handler.go
+++ b/plugins/http/handler.go
@@ -10,9 +10,9 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/log"
"github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
)
const (
@@ -26,7 +26,7 @@ const (
const MB = 1024 * 1024
type Handle interface {
- AddListener(l worker.EventListener)
+ AddListener(l events.EventListener)
ServeHTTP(w http.ResponseWriter, r *http.Request)
}
@@ -75,7 +75,7 @@ type handler struct {
log log.Logger
pool pool.Pool
mul sync.Mutex
- lsn worker.EventListener
+ lsn events.EventListener
}
func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) {
@@ -91,7 +91,7 @@ func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool po
}
// Listen attaches handler event controller.
-func (h *handler) AddListener(l worker.EventListener) {
+func (h *handler) AddListener(l events.EventListener) {
h.mul.Lock()
defer h.mul.Unlock()
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 9cb01d4b..460263f6 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -15,6 +15,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/spiral/endure"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/log"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
@@ -55,8 +56,8 @@ type Plugin struct {
cfg *Config
// middlewares to chain
mdwr middleware
- // Event listener to stdout
- listener worker.EventListener
+ // WorkerEvent listener to stdout
+ listener events.EventListener
// Pool which attached to all servers
pool pool.Pool
@@ -71,7 +72,7 @@ type Plugin struct {
}
// AddListener attaches server event controller.
-func (s *Plugin) AddListener(listener worker.EventListener) {
+func (s *Plugin) AddListener(listener events.EventListener) {
// save listeners for Reset
s.listener = listener
s.pool.AddListener(listener)
@@ -124,8 +125,8 @@ func (s *Plugin) logCallback(event interface{}) {
s.log.Debug("http handler response received", "elapsed", ev.Elapsed().String(), "remote address", ev.Request.RemoteAddr)
case ErrorEvent:
s.log.Error("error event received", "elapsed", ev.Elapsed().String(), "error", ev.Error)
- case worker.Event:
- s.log.Debug("worker event received", "event", ev.Event, "worker state", ev.Worker.State())
+ case events.WorkerEvent:
+ s.log.Debug("worker event received", "event", ev.Event, "worker state", ev.Worker.(worker.BaseProcess).State())
default:
fmt.Println(event)
}
diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go
index 2979949d..8b15cf0c 100644
--- a/plugins/http/tests/http_test.go
+++ b/plugins/http/tests/http_test.go
@@ -19,7 +19,7 @@ import (
"github.com/spiral/endure"
"github.com/spiral/goridge/v3"
"github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/mocks"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
@@ -901,7 +901,7 @@ func TestHttpEchoErr(t *testing.T) {
mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1")
mockLogger.EXPECT().Debug("WORLD", "pid", gomock.Any())
- mockLogger.EXPECT().Debug("worker event received", "event", worker.EventWorkerLog, "worker state", gomock.Any())
+ mockLogger.EXPECT().Debug("worker event received", "event", events.EventWorkerLog, "worker state", gomock.Any())
err = cont.RegisterAll(
cfg,
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 7c91bbcc..e6003fbc 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -8,6 +8,7 @@ import (
"strings"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/log"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
@@ -169,12 +170,12 @@ func (server *Plugin) setEnv(e server.Env) []string {
}
func (server *Plugin) collectLogs(event interface{}) {
- if we, ok := event.(worker.Event); ok {
+ if we, ok := event.(events.WorkerEvent); ok {
switch we.Event {
- case worker.EventWorkerError:
- server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
- case worker.EventWorkerLog:
- server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
+ case events.EventWorkerError:
+ server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid())
+ case events.EventWorkerLog:
+ server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid())
}
}
}
diff --git a/util/events.go b/util/events.go
index 8aa5a1c6..259c7ddb 100755
--- a/util/events.go
+++ b/util/events.go
@@ -1,14 +1,16 @@
package util
-import "github.com/spiral/roadrunner/v2/interfaces/worker"
+import (
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+)
// EventHandler helps to broadcast events to multiple listeners.
type EventHandler struct {
- listeners []worker.EventListener
+ listeners []events.EventListener
}
-func NewEventsHandler() worker.EventsHandler {
- return &EventHandler{listeners: make([]worker.EventListener, 0, 2)}
+func NewEventsHandler() events.Handler {
+ return &EventHandler{listeners: make([]events.EventListener, 0, 2)}
}
// NumListeners returns number of event listeners.
@@ -17,7 +19,7 @@ func (eb *EventHandler) NumListeners() int {
}
// AddListener registers new event listener.
-func (eb *EventHandler) AddListener(listener worker.EventListener) {
+func (eb *EventHandler) AddListener(listener events.EventListener) {
eb.listeners = append(eb.listeners, listener)
}