summaryrefslogtreecommitdiff
path: root/interfaces
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-17 03:16:55 +0300
committerValery Piashchynski <[email protected]>2020-12-17 03:16:55 +0300
commit40cfd9f6b44dfe987bfbf010bf67b32abdc64208 (patch)
tree10e3c3cd0805619ac30533078eb7d2585877a1b3 /interfaces
parent9d5fe4f6a98b30fd73be8259f84fa595ac994a71 (diff)
Now better
Diffstat (limited to 'interfaces')
-rw-r--r--interfaces/events/handler.go10
-rw-r--r--interfaces/events/pool_events.go65
-rw-r--r--interfaces/events/worker_events.go34
-rwxr-xr-xinterfaces/factory/factory.go2
-rw-r--r--interfaces/pool/pool.go102
-rw-r--r--interfaces/worker/factory.go18
-rw-r--r--interfaces/worker/watcher.go26
-rw-r--r--interfaces/worker/worker.go62
8 files changed, 318 insertions, 1 deletions
diff --git a/interfaces/events/handler.go b/interfaces/events/handler.go
new file mode 100644
index 00000000..01f64d73
--- /dev/null
+++ b/interfaces/events/handler.go
@@ -0,0 +1,10 @@
+package events
+
+type Handler interface {
+ NumListeners() int
+ AddListener(listener EventListener)
+ Push(e interface{})
+}
+
+// Event listener listens for the events produced by worker, worker pool or other service.
+type EventListener func(event interface{})
diff --git a/interfaces/events/pool_events.go b/interfaces/events/pool_events.go
new file mode 100644
index 00000000..cc32f6b2
--- /dev/null
+++ b/interfaces/events/pool_events.go
@@ -0,0 +1,65 @@
+package events
+
+const (
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct P = iota + 7800
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventPoolError caused on pool wide errors.
+ EventPoolError
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // todo: EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
+ EventTTL
+
+ // todo: EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+)
+
+type P int64
+
+func (ev P) String() string {
+ switch ev {
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventPoolError:
+ return "EventPoolError"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ }
+ return "Unknown event type"
+}
+
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event P
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
diff --git a/interfaces/events/worker_events.go b/interfaces/events/worker_events.go
new file mode 100644
index 00000000..497f0a06
--- /dev/null
+++ b/interfaces/events/worker_events.go
@@ -0,0 +1,34 @@
+package events
+
+// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
+const (
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError E = iota + 200
+
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+)
+
+type E int64
+
+func (ev E) String() string {
+ switch ev {
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ }
+ return "Unknown event type"
+}
+
+// WorkerEvent wraps worker events.
+type WorkerEvent struct {
+ // Event id, see below.
+ Event E
+
+ // Worker triggered the event.
+ Worker interface{}
+
+ // Event specific payload.
+ Payload interface{}
+}
diff --git a/interfaces/factory/factory.go b/interfaces/factory/factory.go
index 036ff4e7..51b73501 100755
--- a/interfaces/factory/factory.go
+++ b/interfaces/factory/factory.go
@@ -4,7 +4,7 @@ import (
"context"
"os/exec"
- "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
)
// Factory is responsible of wrapping given command into tasks WorkerProcess.
diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go
index 4eadf064..a1015fd6 100644
--- a/interfaces/pool/pool.go
+++ b/interfaces/pool/pool.go
@@ -1 +1,103 @@
package pool
+
+import (
+ "context"
+ "runtime"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+)
+
+// Pool managed set of inner worker processes.
+type Pool interface {
+ // AddListener connects event listener to the pool.
+ AddListener(listener events.EventListener)
+
+ // GetConfig returns pool configuration.
+ GetConfig() interface{}
+
+ // Exec
+ Exec(rqs internal.Payload) (internal.Payload, error)
+
+ ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error)
+
+ // Workers returns worker list associated with the pool.
+ Workers() (workers []worker.BaseProcess)
+
+ // Remove worker from the pool.
+ RemoveWorker(worker worker.BaseProcess) error
+
+ // Destroy all underlying stack (but let them to complete the task).
+ Destroy(ctx context.Context)
+}
+
+// Configures the pool behaviour.
+type Config struct {
+ // Debug flag creates new fresh worker before every request.
+ Debug bool
+
+ // NumWorkers defines how many sub-processes can be run at once. This value
+ // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
+ NumWorkers int64
+
+ // MaxJobs defines how many executions is allowed for the worker until
+ // it's destruction. set 1 to create new process for each new task, 0 to let
+ // worker handle as many tasks as it can.
+ MaxJobs int64
+
+ // AllocateTimeout defines for how long pool will be waiting for a worker to
+ // be freed to handle the task. Defaults to 60s.
+ AllocateTimeout time.Duration
+
+ // DestroyTimeout defines for how long pool should be waiting for worker to
+ // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
+ DestroyTimeout time.Duration
+
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor *SupervisorConfig
+}
+
+// InitDefaults enables default config values.
+func (cfg *Config) InitDefaults() {
+ if cfg.NumWorkers == 0 {
+ cfg.NumWorkers = int64(runtime.NumCPU())
+ }
+
+ if cfg.AllocateTimeout == 0 {
+ cfg.AllocateTimeout = time.Minute
+ }
+
+ if cfg.DestroyTimeout == 0 {
+ cfg.DestroyTimeout = time.Minute
+ }
+ if cfg.Supervisor == nil {
+ return
+ }
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick uint64
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL uint64
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL uint64
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL uint64
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64
+}
+
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = 1
+ }
+}
diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go
new file mode 100644
index 00000000..19e2bf5d
--- /dev/null
+++ b/interfaces/worker/factory.go
@@ -0,0 +1,18 @@
+package worker
+
+import (
+ "context"
+ "os/exec"
+)
+
+// Factory is responsible of wrapping given command into tasks WorkerProcess.
+type Factory interface {
+ // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
+ // Process must not be started.
+ SpawnWorkerWithContext(context.Context, *exec.Cmd) (BaseProcess, error)
+ // SpawnWorker creates new WorkerProcess process based on given command.
+ // Process must not be started.
+ SpawnWorker(*exec.Cmd) (BaseProcess, error)
+ // Close the factory and underlying connections.
+ Close(ctx context.Context) error
+}
diff --git a/interfaces/worker/watcher.go b/interfaces/worker/watcher.go
new file mode 100644
index 00000000..ce2c1c5a
--- /dev/null
+++ b/interfaces/worker/watcher.go
@@ -0,0 +1,26 @@
+package worker
+
+import "context"
+
+type Watcher interface {
+ // AddToWatch used to add stack to wait its state
+ AddToWatch(workers []BaseProcess) error
+
+ // GetFreeWorker provide first free worker
+ GetFreeWorker(ctx context.Context) (BaseProcess, error)
+
+ // PutWorker enqueues worker back
+ PushWorker(w BaseProcess)
+
+ // AllocateNew used to allocate new worker and put in into the WorkerWatcher
+ AllocateNew() error
+
+ // Destroy destroys the underlying stack
+ Destroy(ctx context.Context)
+
+ // WorkersList return all stack w/o removing it from internal storage
+ WorkersList() []BaseProcess
+
+ // RemoveWorker remove worker from the stack
+ RemoveWorker(wb BaseProcess) error
+}
diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go
new file mode 100644
index 00000000..edbc68d9
--- /dev/null
+++ b/interfaces/worker/worker.go
@@ -0,0 +1,62 @@
+package worker
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/spiral/goridge/v3"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/internal"
+)
+
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (BaseProcess, error)
+
+type BaseProcess interface {
+ fmt.Stringer
+
+ // Pid returns worker pid.
+ Pid() int64
+
+ // Created returns time worker was created at.
+ Created() time.Time
+
+ // AddListener attaches listener to consume worker events.
+ AddListener(listener events.EventListener)
+
+ // State return receive-only WorkerProcess state object, state can be used to safely access
+ // WorkerProcess status, time when status changed and number of WorkerProcess executions.
+ State() internal.State
+
+ // Start used to run Cmd and immediately return
+ Start() error
+
+ // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
+ // complete and will return process error (if any), if stderr is presented it's value
+ // will be wrapped as WorkerError. Method will return error code if php process fails
+ // to find or Start the script.
+ Wait() error
+
+ // Stop sends soft termination command to the WorkerProcess and waits for process completion.
+ Stop(ctx context.Context) error
+
+ // Kill kills underlying process, make sure to call Wait() func to gather
+ // error log from the stderr. Does not waits for process completion!
+ Kill() error
+
+ // Relay returns attached to worker goridge relay
+ Relay() goridge.Relay
+
+ // AttachRelay used to attach goridge relay to the worker process
+ AttachRelay(rl goridge.Relay)
+}
+
+type SyncWorker interface {
+ // BaseProcess provides basic functionality for the SyncWorker
+ BaseProcess
+ // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
+ Exec(rqs internal.Payload) (internal.Payload, error)
+ // ExecWithContext used to handle Exec with TTL
+ ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error)
+}