diff options
Diffstat (limited to 'pool.go')
-rwxr-xr-x[-rw-r--r--] | pool.go | 136 |
1 files changed, 118 insertions, 18 deletions
@@ -1,39 +1,139 @@ package roadrunner +import ( + "context" + "runtime" + "time" + + "github.com/spiral/roadrunner/v2/util" +) + +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event int64 + + // Payload depends on event type, typically it's worker or error. + Payload interface{} +} + const ( // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct = iota + 100 + EventWorkerConstruct = iota + 7800 // EventWorkerDestruct thrown after worker destruction. EventWorkerDestruct - // EventWorkerKill thrown after worker is being forcefully killed. - EventWorkerKill + // EventPoolError caused on pool wide errors. + EventPoolError - // EventWorkerError thrown any worker related even happen (passed with WorkerError) - EventWorkerError + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError - // EventWorkerDead thrown when worker stops worker for any reason. - EventWorkerDead + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers - // EventPoolError caused on pool wide errors - EventPoolError + // todo: EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError + EventTTL + + // todo: EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL ) // Pool managed set of inner worker processes. type Pool interface { - // Listen all caused events to attached controller. - Listen(l func(event int, ctx interface{})) + // AddListener connects event listener to the pool. + AddListener(listener util.EventListener) + + // GetConfig returns pool configuration. + GetConfig() PoolConfig - // Exec one task with given payload and context, returns result or error. - Exec(rqs *Payload) (rsp *Payload, err error) + // Exec + Exec(rqs Payload) (Payload, error) + + ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) // Workers returns worker list associated with the pool. - Workers() (workers []*Worker) + Workers() (workers []WorkerBase) + + // Remove worker from the pool. + RemoveWorker(worker WorkerBase) error + + // Destroy all underlying stack (but let them to complete the task). + Destroy(ctx context.Context) +} - // Remove forces pool to remove specific worker. Return true is this is first remove request on given worker. - Remove(w *Worker, err error) bool +// Configures the pool behaviour. +type PoolConfig struct { + // Debug flag creates new fresh worker before every request. + Debug bool + + // NumWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. + NumWorkers int64 + + // MaxJobs defines how many executions is allowed for the worker until + // it's destruction. set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. + MaxJobs int64 + + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. Defaults to 60s. + AllocateTimeout time.Duration + + // DestroyTimeout defines for how long pool should be waiting for worker to + // properly destroy, if timeout reached worker will be killed. Defaults to 60s. + DestroyTimeout time.Duration + + // Supervision config to limit worker and pool memory usage. + Supervisor *SupervisorConfig +} + +// InitDefaults enables default config values. +func (cfg *PoolConfig) InitDefaults() { + if cfg.NumWorkers == 0 { + cfg.NumWorkers = int64(runtime.NumCPU()) + } + + if cfg.AllocateTimeout == 0 { + cfg.AllocateTimeout = time.Minute + } + + if cfg.DestroyTimeout == 0 { + cfg.DestroyTimeout = time.Minute + } + if cfg.Supervisor == nil { + return + } + cfg.Supervisor.InitDefaults() +} + +type SupervisorConfig struct { + // WatchTick defines how often to check the state of worker. + WatchTick uint64 + + // TTL defines maximum time worker is allowed to live. + TTL uint64 + + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. + IdleTTL uint64 + + // ExecTTL defines maximum lifetime per job. + ExecTTL uint64 + + // MaxWorkerMemory limits memory per worker. + MaxWorkerMemory uint64 +} - // Destroy all underlying workers (but let them to complete the task). - Destroy() +// InitDefaults enables default config values. +func (cfg *SupervisorConfig) InitDefaults() { + if cfg.WatchTick == 0 { + cfg.WatchTick = 1 + } } |