summaryrefslogtreecommitdiff
path: root/pool.go
blob: fe864878b59b96d40c97311dae2fedbf8bd68e25 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package roadrunner

import (
	"context"
	"github.com/spiral/roadrunner/v2/util"
	"runtime"
	"time"
)

// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
type PoolEvent struct {
	// Event type, see below.
	Event int64

	// Payload depends on event type, typically it's worker or error.
	Payload interface{}
}

const (
	// EventWorkerConstruct thrown when new worker is spawned.
	EventWorkerConstruct = iota + 7800

	// EventWorkerDestruct thrown after worker destruction.
	EventWorkerDestruct

	// EventPoolError caused on pool wide errors.
	EventPoolError

	// EventSupervisorError triggered when supervisor can not complete work.
	EventSupervisorError

	// todo: EventMaxMemory caused when worker consumes more memory than allowed.
	EventMaxMemory

	// todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
	EventTTL

	// todo: EventIdleTTL triggered when worker spends too much time at rest.
	EventIdleTTL

	// todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
	EventExecTTL
)

// Pool managed set of inner worker processes.
type Pool interface {
	// AddListener connects event listener to the pool.
	AddListener(listener util.EventListener)

	// GetConfig returns pool configuration.
	GetConfig() Config

	// Exec
	Exec(rqs Payload) (Payload, error)

	// Workers returns worker list associated with the pool.
	Workers() (workers []WorkerBase)

	// Remove worker from the pool.
	RemoveWorker(ctx context.Context, worker WorkerBase) error

	// Destroy all underlying stack (but let them to complete the task).
	Destroy(ctx context.Context)
}

// Configures the pool behaviour.
type Config struct {
	// NumWorkers defines how many sub-processes can be run at once. This value
	// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
	NumWorkers int64

	// MaxJobs defines how many executions is allowed for the worker until
	// it's destruction. set 1 to create new process for each new task, 0 to let
	// worker handle as many tasks as it can.
	MaxJobs int64

	// AllocateTimeout defines for how long pool will be waiting for a worker to
	// be freed to handle the task. Defaults to 60s.
	AllocateTimeout time.Duration

	// DestroyTimeout defines for how long pool should be waiting for worker to
	// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
	DestroyTimeout time.Duration

	// Supervision config to limit worker and pool memory usage.
	Supervisor SupervisorConfig
}

// InitDefaults enables default config values.
func (cfg *Config) InitDefaults() {
	if cfg.NumWorkers == 0 {
		cfg.NumWorkers = int64(runtime.NumCPU())
	}

	if cfg.AllocateTimeout == 0 {
		cfg.AllocateTimeout = time.Minute
	}

	if cfg.DestroyTimeout == 0 {
		cfg.DestroyTimeout = time.Minute
	}

	cfg.Supervisor.InitDefaults()
}

type SupervisorConfig struct {
	// WatchTick defines how often to check the state of worker.
	WatchTick time.Duration

	// TTL defines maximum time worker is allowed to live.
	TTL int64

	// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
	IdleTTL int64

	// ExecTTL defines maximum lifetime per job.
	ExecTTL time.Duration

	// MaxWorkerMemory limits memory per worker.
	MaxWorkerMemory uint64
}

// InitDefaults enables default config values.
func (cfg *SupervisorConfig) InitDefaults() {
	if cfg.WatchTick == 0 {
		cfg.WatchTick = time.Second
	}
}