summaryrefslogtreecommitdiff
path: root/pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'pool.go')
-rwxr-xr-x[-rw-r--r--]pool.go136
1 files changed, 118 insertions, 18 deletions
diff --git a/pool.go b/pool.go
index d863e96f..3e38c4cb 100644..100755
--- a/pool.go
+++ b/pool.go
@@ -1,39 +1,139 @@
package roadrunner
+import (
+ "context"
+ "runtime"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/util"
+)
+
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event int64
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
+
const (
// EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct = iota + 100
+ EventWorkerConstruct = iota + 7800
// EventWorkerDestruct thrown after worker destruction.
EventWorkerDestruct
- // EventWorkerKill thrown after worker is being forcefully killed.
- EventWorkerKill
+ // EventPoolError caused on pool wide errors.
+ EventPoolError
- // EventWorkerError thrown any worker related even happen (passed with WorkerError)
- EventWorkerError
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
- // EventWorkerDead thrown when worker stops worker for any reason.
- EventWorkerDead
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
- // EventPoolError caused on pool wide errors
- EventPoolError
+ // todo: EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
+ EventTTL
+
+ // todo: EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
)
// Pool managed set of inner worker processes.
type Pool interface {
- // Listen all caused events to attached controller.
- Listen(l func(event int, ctx interface{}))
+ // AddListener connects event listener to the pool.
+ AddListener(listener util.EventListener)
+
+ // GetConfig returns pool configuration.
+ GetConfig() PoolConfig
- // Exec one task with given payload and context, returns result or error.
- Exec(rqs *Payload) (rsp *Payload, err error)
+ // Exec
+ Exec(rqs Payload) (Payload, error)
+
+ ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
// Workers returns worker list associated with the pool.
- Workers() (workers []*Worker)
+ Workers() (workers []WorkerBase)
+
+ // Remove worker from the pool.
+ RemoveWorker(worker WorkerBase) error
+
+ // Destroy all underlying stack (but let them to complete the task).
+ Destroy(ctx context.Context)
+}
- // Remove forces pool to remove specific worker. Return true is this is first remove request on given worker.
- Remove(w *Worker, err error) bool
+// Configures the pool behaviour.
+type PoolConfig struct {
+ // Debug flag creates new fresh worker before every request.
+ Debug bool
+
+ // NumWorkers defines how many sub-processes can be run at once. This value
+ // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
+ NumWorkers int64
+
+ // MaxJobs defines how many executions is allowed for the worker until
+ // it's destruction. set 1 to create new process for each new task, 0 to let
+ // worker handle as many tasks as it can.
+ MaxJobs int64
+
+ // AllocateTimeout defines for how long pool will be waiting for a worker to
+ // be freed to handle the task. Defaults to 60s.
+ AllocateTimeout time.Duration
+
+ // DestroyTimeout defines for how long pool should be waiting for worker to
+ // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
+ DestroyTimeout time.Duration
+
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor *SupervisorConfig
+}
+
+// InitDefaults enables default config values.
+func (cfg *PoolConfig) InitDefaults() {
+ if cfg.NumWorkers == 0 {
+ cfg.NumWorkers = int64(runtime.NumCPU())
+ }
+
+ if cfg.AllocateTimeout == 0 {
+ cfg.AllocateTimeout = time.Minute
+ }
+
+ if cfg.DestroyTimeout == 0 {
+ cfg.DestroyTimeout = time.Minute
+ }
+ if cfg.Supervisor == nil {
+ return
+ }
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick uint64
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL uint64
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL uint64
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL uint64
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64
+}
- // Destroy all underlying workers (but let them to complete the task).
- Destroy()
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = 1
+ }
}