blob: 9038c730b9e15603235ccee4dd94264bbdbfe6b4 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
package roadrunner
import (
"context"
"runtime"
"time"
)
const (
// EventWorkerConstruct thrown when new worker is spawned.
EventWorkerConstruct = iota + 100
// EventWorkerDestruct thrown after worker destruction.
EventWorkerDestruct
// EventWorkerKill thrown after worker is being forcefully killed.
EventWorkerKill
// EventWorkerError thrown any worker related even happen (passed with WorkerError)
EventWorkerEvent
// EventWorkerDead thrown when worker stops worker for any reason.
EventWorkerDead
// EventPoolError caused on pool wide errors
EventPoolError
)
const (
// EventMaxMemory caused when worker consumes more memory than allowed.
EventMaxMemory = iota + 8000
// EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
EventTTL
// EventIdleTTL triggered when worker spends too much time at rest.
EventIdleTTL
// EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
EventExecTTL
)
// Pool managed set of inner worker processes.
type Pool interface {
// ATTENTION, YOU SHOULD CONSUME EVENTS, OTHERWISE POOL WILL BLOCK
Events() chan PoolEvent
// Exec one task with given payload and context, returns result or error.
ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
Exec(rqs Payload) (Payload, error)
// Workers returns worker list associated with the pool.
Workers() (workers []WorkerBase)
RemoveWorker(ctx context.Context, worker WorkerBase) error
Config() Config
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
}
// todo: merge with pool options
// Config defines basic behaviour of worker creation and handling process.
//
type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
NumWorkers int64
// MaxJobs defines how many executions is allowed for the worker until
// it's destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
MaxJobs int64
// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task. Defaults to 60s.
AllocateTimeout time.Duration
// DestroyTimeout defines for how long pool should be waiting for worker to
// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
DestroyTimeout time.Duration
// TTL defines maximum time worker is allowed to live.
TTL int64
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
IdleTTL int64
// ExecTTL defines maximum lifetime per job.
ExecTTL time.Duration
// MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes.
MaxPoolMemory uint64
// MaxWorkerMemory limits memory per worker.
MaxWorkerMemory uint64
}
// InitDefaults enables default config values.
func (cfg *Config) InitDefaults() {
if cfg.NumWorkers == 0 {
cfg.NumWorkers = int64(runtime.NumCPU())
}
if cfg.AllocateTimeout == 0 {
cfg.AllocateTimeout = time.Minute
}
if cfg.DestroyTimeout == 0 {
cfg.DestroyTimeout = time.Minute
}
}
|