diff options
Diffstat (limited to 'pool.go')
-rw-r--r-- | pool.go | 117 |
1 files changed, 108 insertions, 9 deletions
@@ -1,5 +1,12 @@ package roadrunner +import ( + "context" + "fmt" + "runtime" + "time" +) + const ( // EventWorkerConstruct thrown when new worker is spawned. EventWorkerConstruct = iota + 100 @@ -11,7 +18,7 @@ const ( EventWorkerKill // EventWorkerError thrown any worker related even happen (passed with WorkerError) - EventWorkerError + EventWorkerEvent // EventWorkerDead thrown when worker stops worker for any reason. EventWorkerDead @@ -20,20 +27,112 @@ const ( EventPoolError ) +const ( + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory = iota + 8000 + + // EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL +) + // Pool managed set of inner worker processes. type Pool interface { - // Listen all caused events to attached controller. - Listen(l func(event int, ctx interface{})) + // ATTENTION, YOU SHOULD CONSUME EVENTS, OTHERWISE POOL WILL BLOCK + Events() chan PoolEvent // Exec one task with given payload and context, returns result or error. - Exec(rqs *Payload) (rsp *Payload, err error) + Exec(ctx context.Context, rqs Payload) (Payload, error) // Workers returns worker list associated with the pool. - Workers() (workers []*Worker) + Workers(ctx context.Context) (workers []WorkerBase) + + RemoveWorker(ctx context.Context, worker WorkerBase) error + + Config() Config + + // Destroy all underlying stack (but let them to complete the task). + Destroy(ctx context.Context) +} + +// todo: merge with pool options + +// Config defines basic behaviour of worker creation and handling process. +// +type Config struct { + // NumWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Swapper while hot-swap. + NumWorkers int64 + + // MaxJobs defines how many executions is allowed for the worker until + // it's destruction. set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. + MaxJobs int64 + + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. + AllocateTimeout time.Duration + + // DestroyTimeout defines for how long pool should be waiting for worker to + // properly destroy, if timeout reached worker will be killed. + DestroyTimeout time.Duration + + // TTL defines maximum time worker is allowed to live. + TTL int64 + + // IdleTTL defines maximum duration worker can spend in idle mode. + IdleTTL int64 + + // ExecTTL defines maximum lifetime per job. + ExecTTL time.Duration + + // MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes. + MaxPoolMemory uint64 + + MaxWorkerMemory uint64 + + // config from limit plugin, combine TODO + // single bootstrap TODO, bool + // warmup one worker and start consume requests and then start the rest of the stack + + // max memory for pool + // max ttl + // max idle ttl + + // ATTACHER interface - delete +} + +// InitDefaults allows to init blank config with pre-defined set of default values. +func (cfg *Config) InitDefaults() error { + cfg.AllocateTimeout = time.Minute + cfg.DestroyTimeout = time.Minute + cfg.NumWorkers = int64(runtime.NumCPU()) + + return nil +} + +// Valid returns error if config not valid. +func (cfg *Config) Valid() error { + if cfg.NumWorkers == 0 { + return fmt.Errorf("pool.NumWorkers must be set") + } + + if cfg.AllocateTimeout == 0 { + return fmt.Errorf("pool.AllocateTimeout must be set") + } + + if cfg.DestroyTimeout == 0 { + return fmt.Errorf("pool.DestroyTimeout must be set") + } - // Remove forces pool to remove specific worker. Return true is this is first remove request on given worker. - Remove(w *Worker, err error) bool + if cfg.ExecTTL == 0 { + return fmt.Errorf("pool.ExecTTL must be set") + } - // Destroy all underlying workers (but let them to complete the task). - Destroy() + return nil } |