diff options
Diffstat (limited to 'interfaces')
-rw-r--r-- | interfaces/events/handler.go | 10 | ||||
-rw-r--r-- | interfaces/events/pool_events.go | 65 | ||||
-rw-r--r-- | interfaces/events/worker_events.go | 34 | ||||
-rwxr-xr-x | interfaces/factory/factory.go | 22 | ||||
-rw-r--r-- | interfaces/informer/interface.go | 6 | ||||
-rw-r--r-- | interfaces/pool/pool.go | 103 | ||||
-rw-r--r-- | interfaces/server/interface.go | 8 | ||||
-rw-r--r-- | interfaces/worker/factory.go | 18 | ||||
-rw-r--r-- | interfaces/worker/watcher.go | 26 | ||||
-rw-r--r-- | interfaces/worker/worker.go | 62 |
10 files changed, 349 insertions, 5 deletions
diff --git a/interfaces/events/handler.go b/interfaces/events/handler.go new file mode 100644 index 00000000..01f64d73 --- /dev/null +++ b/interfaces/events/handler.go @@ -0,0 +1,10 @@ +package events + +type Handler interface { + NumListeners() int + AddListener(listener EventListener) + Push(e interface{}) +} + +// Event listener listens for the events produced by worker, worker pool or other service. +type EventListener func(event interface{}) diff --git a/interfaces/events/pool_events.go b/interfaces/events/pool_events.go new file mode 100644 index 00000000..cc32f6b2 --- /dev/null +++ b/interfaces/events/pool_events.go @@ -0,0 +1,65 @@ +package events + +const ( + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct P = iota + 7800 + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventPoolError caused on pool wide errors. + EventPoolError + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // todo: EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError + EventTTL + + // todo: EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL +) + +type P int64 + +func (ev P) String() string { + switch ev { + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventPoolError: + return "EventPoolError" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + } + return "Unknown event type" +} + +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event P + + // Payload depends on event type, typically it's worker or error. + Payload interface{} +} diff --git a/interfaces/events/worker_events.go b/interfaces/events/worker_events.go new file mode 100644 index 00000000..497f0a06 --- /dev/null +++ b/interfaces/events/worker_events.go @@ -0,0 +1,34 @@ +package events + +// EventWorkerKill thrown after WorkerProcess is being forcefully killed. +const ( + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError E = iota + 200 + + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog +) + +type E int64 + +func (ev E) String() string { + switch ev { + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + } + return "Unknown event type" +} + +// WorkerEvent wraps worker events. +type WorkerEvent struct { + // Event id, see below. + Event E + + // Worker triggered the event. + Worker interface{} + + // Event specific payload. + Payload interface{} +} diff --git a/interfaces/factory/factory.go b/interfaces/factory/factory.go new file mode 100755 index 00000000..51b73501 --- /dev/null +++ b/interfaces/factory/factory.go @@ -0,0 +1,22 @@ +package worker + +import ( + "context" + "os/exec" + + "github.com/spiral/roadrunner/v2/interfaces/worker" +) + +// Factory is responsible of wrapping given command into tasks WorkerProcess. +type Factory interface { + // SpawnWorkerWithContext creates new WorkerProcess process based on given command with contex. + // Process must not be started. + SpawnWorkerWithContext(context.Context, *exec.Cmd) (worker.BaseProcess, error) + + // SpawnWorker creates new WorkerProcess process based on given command. + // Process must not be started. + SpawnWorker(*exec.Cmd) (worker.BaseProcess, error) + + // Close the factory and underlying connections. + Close(ctx context.Context) error +} diff --git a/interfaces/informer/interface.go b/interfaces/informer/interface.go index a8d32841..b975edd7 100644 --- a/interfaces/informer/interface.go +++ b/interfaces/informer/interface.go @@ -1,8 +1,10 @@ package informer -import "github.com/spiral/roadrunner/v2" +import ( + "github.com/spiral/roadrunner/v2/interfaces/worker" +) // Informer used to get workers from particular plugin or set of plugins type Informer interface { - Workers() []roadrunner.WorkerBase + Workers() []worker.BaseProcess } diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go new file mode 100644 index 00000000..a1015fd6 --- /dev/null +++ b/interfaces/pool/pool.go @@ -0,0 +1,103 @@ +package pool + +import ( + "context" + "runtime" + "time" + + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" +) + +// Pool managed set of inner worker processes. +type Pool interface { + // AddListener connects event listener to the pool. + AddListener(listener events.EventListener) + + // GetConfig returns pool configuration. + GetConfig() interface{} + + // Exec + Exec(rqs internal.Payload) (internal.Payload, error) + + ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) + + // Workers returns worker list associated with the pool. + Workers() (workers []worker.BaseProcess) + + // Remove worker from the pool. + RemoveWorker(worker worker.BaseProcess) error + + // Destroy all underlying stack (but let them to complete the task). + Destroy(ctx context.Context) +} + +// Configures the pool behaviour. +type Config struct { + // Debug flag creates new fresh worker before every request. + Debug bool + + // NumWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. + NumWorkers int64 + + // MaxJobs defines how many executions is allowed for the worker until + // it's destruction. set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. + MaxJobs int64 + + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. Defaults to 60s. + AllocateTimeout time.Duration + + // DestroyTimeout defines for how long pool should be waiting for worker to + // properly destroy, if timeout reached worker will be killed. Defaults to 60s. + DestroyTimeout time.Duration + + // Supervision config to limit worker and pool memory usage. + Supervisor *SupervisorConfig +} + +// InitDefaults enables default config values. +func (cfg *Config) InitDefaults() { + if cfg.NumWorkers == 0 { + cfg.NumWorkers = int64(runtime.NumCPU()) + } + + if cfg.AllocateTimeout == 0 { + cfg.AllocateTimeout = time.Minute + } + + if cfg.DestroyTimeout == 0 { + cfg.DestroyTimeout = time.Minute + } + if cfg.Supervisor == nil { + return + } + cfg.Supervisor.InitDefaults() +} + +type SupervisorConfig struct { + // WatchTick defines how often to check the state of worker. + WatchTick uint64 + + // TTL defines maximum time worker is allowed to live. + TTL uint64 + + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. + IdleTTL uint64 + + // ExecTTL defines maximum lifetime per job. + ExecTTL uint64 + + // MaxWorkerMemory limits memory per worker. + MaxWorkerMemory uint64 +} + +// InitDefaults enables default config values. +func (cfg *SupervisorConfig) InitDefaults() { + if cfg.WatchTick == 0 { + cfg.WatchTick = 1 + } +} diff --git a/interfaces/server/interface.go b/interfaces/server/interface.go index 2dae30c5..c50848d7 100644 --- a/interfaces/server/interface.go +++ b/interfaces/server/interface.go @@ -4,7 +4,9 @@ import ( "context" "os/exec" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" ) type Env map[string]string @@ -12,6 +14,6 @@ type Env map[string]string // Server creates workers for the application. type Server interface { CmdFactory(env Env) (func() *exec.Cmd, error) - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env Env) (roadrunner.Pool, error) + NewWorker(ctx context.Context, env Env) (worker.BaseProcess, error) + NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env) (pool.Pool, error) } diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go new file mode 100644 index 00000000..19e2bf5d --- /dev/null +++ b/interfaces/worker/factory.go @@ -0,0 +1,18 @@ +package worker + +import ( + "context" + "os/exec" +) + +// Factory is responsible of wrapping given command into tasks WorkerProcess. +type Factory interface { + // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. + // Process must not be started. + SpawnWorkerWithContext(context.Context, *exec.Cmd) (BaseProcess, error) + // SpawnWorker creates new WorkerProcess process based on given command. + // Process must not be started. + SpawnWorker(*exec.Cmd) (BaseProcess, error) + // Close the factory and underlying connections. + Close(ctx context.Context) error +} diff --git a/interfaces/worker/watcher.go b/interfaces/worker/watcher.go new file mode 100644 index 00000000..ce2c1c5a --- /dev/null +++ b/interfaces/worker/watcher.go @@ -0,0 +1,26 @@ +package worker + +import "context" + +type Watcher interface { + // AddToWatch used to add stack to wait its state + AddToWatch(workers []BaseProcess) error + + // GetFreeWorker provide first free worker + GetFreeWorker(ctx context.Context) (BaseProcess, error) + + // PutWorker enqueues worker back + PushWorker(w BaseProcess) + + // AllocateNew used to allocate new worker and put in into the WorkerWatcher + AllocateNew() error + + // Destroy destroys the underlying stack + Destroy(ctx context.Context) + + // WorkersList return all stack w/o removing it from internal storage + WorkersList() []BaseProcess + + // RemoveWorker remove worker from the stack + RemoveWorker(wb BaseProcess) error +} diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go new file mode 100644 index 00000000..edbc68d9 --- /dev/null +++ b/interfaces/worker/worker.go @@ -0,0 +1,62 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/spiral/goridge/v3" + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/internal" +) + +// Allocator is responsible for worker allocation in the pool +type Allocator func() (BaseProcess, error) + +type BaseProcess interface { + fmt.Stringer + + // Pid returns worker pid. + Pid() int64 + + // Created returns time worker was created at. + Created() time.Time + + // AddListener attaches listener to consume worker events. + AddListener(listener events.EventListener) + + // State return receive-only WorkerProcess state object, state can be used to safely access + // WorkerProcess status, time when status changed and number of WorkerProcess executions. + State() internal.State + + // Start used to run Cmd and immediately return + Start() error + + // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is + // complete and will return process error (if any), if stderr is presented it's value + // will be wrapped as WorkerError. Method will return error code if php process fails + // to find or Start the script. + Wait() error + + // Stop sends soft termination command to the WorkerProcess and waits for process completion. + Stop(ctx context.Context) error + + // Kill kills underlying process, make sure to call Wait() func to gather + // error log from the stderr. Does not waits for process completion! + Kill() error + + // Relay returns attached to worker goridge relay + Relay() goridge.Relay + + // AttachRelay used to attach goridge relay to the worker process + AttachRelay(rl goridge.Relay) +} + +type SyncWorker interface { + // BaseProcess provides basic functionality for the SyncWorker + BaseProcess + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS + Exec(rqs internal.Payload) (internal.Payload, error) + // ExecWithContext used to handle Exec with TTL + ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) +} |