summaryrefslogtreecommitdiff
path: root/pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-26 12:01:53 +0300
committerGitHub <[email protected]>2020-10-26 12:01:53 +0300
commit91cf918b30938129609323ded53e190385e019a6 (patch)
tree0ad9537bd438c63719fb83343ab77fc4ab34eb83 /pool.go
parent68bf13772c6ddfc5159c2a286e1a38e911614e72 (diff)
parent9aae9e2009bad07ebdee73e1c6cf56901d07880a (diff)
Merge pull request #373 from spiral/feature/new-worker-produces-active-worker
Feature/new worker produces active worker
Diffstat (limited to 'pool.go')
-rwxr-xr-xpool.go97
1 files changed, 55 insertions, 42 deletions
diff --git a/pool.go b/pool.go
index 343dedf6..bc57bcbd 100755
--- a/pool.go
+++ b/pool.go
@@ -4,49 +4,52 @@ import (
"context"
"runtime"
"time"
+
+ "github.com/spiral/roadrunner/v2/util"
)
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event int64
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
+
const (
// EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct = iota + 100
+ EventWorkerConstruct = iota + 7800
// EventWorkerDestruct thrown after worker destruction.
EventWorkerDestruct
- // EventWorkerKill thrown after worker is being forcefully killed.
- EventWorkerKill
-
- // EventWorkerError thrown any worker related even happen (passed with WorkerError)
- EventWorkerEvent
-
- // EventWorkerDead thrown when worker stops worker for any reason.
- EventWorkerDead
-
- // EventPoolError caused on pool wide errors
+ // EventPoolError caused on pool wide errors.
EventPoolError
-)
-const (
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory = iota + 8000
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
- // EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
+ // todo: EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
EventTTL
- // EventIdleTTL triggered when worker spends too much time at rest.
+ // todo: EventIdleTTL triggered when worker spends too much time at rest.
EventIdleTTL
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
EventExecTTL
)
// Pool managed set of inner worker processes.
type Pool interface {
- // ATTENTION, YOU SHOULD CONSUME EVENTS, OTHERWISE POOL WILL BLOCK
- Events() chan PoolEvent
+ // AddListener connects event listener to the pool.
+ AddListener(listener util.EventListener)
- // Exec one task with given payload and context, returns result or error.
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+ // GetConfig returns pool configuration.
+ GetConfig() Config
// Exec
Exec(rqs Payload) (Payload, error)
@@ -54,18 +57,14 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []WorkerBase)
+ // Remove worker from the pool.
RemoveWorker(ctx context.Context, worker WorkerBase) error
- Config() Config
-
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
}
-// todo: merge with pool options
-
-// Config defines basic behaviour of worker creation and handling process.
-//
+// Configures the pool behaviour.
type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
@@ -84,20 +83,8 @@ type Config struct {
// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
DestroyTimeout time.Duration
- // TTL defines maximum time worker is allowed to live.
- TTL int64
-
- // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL int64
-
- // ExecTTL defines maximum lifetime per job.
- ExecTTL time.Duration
-
- // MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes.
- MaxPoolMemory uint64
-
- // MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor SupervisorConfig
}
// InitDefaults enables default config values.
@@ -113,4 +100,30 @@ func (cfg *Config) InitDefaults() {
if cfg.DestroyTimeout == 0 {
cfg.DestroyTimeout = time.Minute
}
+
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick time.Duration
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL int64
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL int64
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL time.Duration
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64
+}
+
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = time.Second
+ }
}