summaryrefslogtreecommitdiff
path: root/pool.go
blob: 67d092c087cc3a10872017359d1dba0f507420c5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package roadrunner

import (
	"context"
	"fmt"
	"runtime"
	"time"
)

const (
	// EventWorkerConstruct thrown when new worker is spawned.
	EventWorkerConstruct = iota + 100

	// EventWorkerDestruct thrown after worker destruction.
	EventWorkerDestruct

	// EventWorkerKill thrown after worker is being forcefully killed.
	EventWorkerKill

	// EventWorkerError thrown any worker related even happen (passed with WorkerError)
	EventWorkerEvent

	// EventWorkerDead thrown when worker stops worker for any reason.
	EventWorkerDead

	// EventPoolError caused on pool wide errors
	EventPoolError
)

const (
	// EventMaxMemory caused when worker consumes more memory than allowed.
	EventMaxMemory = iota + 8000

	// EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
	EventTTL

	// EventIdleTTL triggered when worker spends too much time at rest.
	EventIdleTTL

	// EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
	EventExecTTL
)

// Pool managed set of inner worker processes.
type Pool interface {
	// ATTENTION, YOU SHOULD CONSUME EVENTS, OTHERWISE POOL WILL BLOCK
	Events() chan PoolEvent

	// Exec one task with given payload and context, returns result or error.
	Exec(ctx context.Context, rqs Payload) (Payload, error)

	// Workers returns worker list associated with the pool.
	Workers() (workers []WorkerBase)

	RemoveWorker(ctx context.Context, worker WorkerBase) error

	Config() Config

	// Destroy all underlying stack (but let them to complete the task).
	Destroy(ctx context.Context)
}

// todo: merge with pool options

// Config defines basic behaviour of worker creation and handling process.
//
type Config struct {
	// NumWorkers defines how many sub-processes can be run at once. This value
	// might be doubled by Swapper while hot-swap.
	NumWorkers int64

	// MaxJobs defines how many executions is allowed for the worker until
	// it's destruction. set 1 to create new process for each new task, 0 to let
	// worker handle as many tasks as it can.
	MaxJobs int64

	// AllocateTimeout defines for how long pool will be waiting for a worker to
	// be freed to handle the task.
	AllocateTimeout time.Duration

	// DestroyTimeout defines for how long pool should be waiting for worker to
	// properly destroy, if timeout reached worker will be killed.
	DestroyTimeout time.Duration

	// TTL defines maximum time worker is allowed to live.
	TTL int64

	// IdleTTL defines maximum duration worker can spend in idle mode.
	IdleTTL int64

	// ExecTTL defines maximum lifetime per job.
	ExecTTL time.Duration

	// MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes.
	MaxPoolMemory uint64

	MaxWorkerMemory uint64

	// config from limit plugin, combine TODO
	// single bootstrap TODO, bool
	// warmup one worker and start consume requests and then start the rest of the stack

	// max memory for pool
	// max ttl
	// max idle ttl

	// ATTACHER interface - delete
}

// InitDefaults allows to init blank config with pre-defined set of default values.
func (cfg *Config) InitDefaults() error {
	cfg.AllocateTimeout = time.Minute
	cfg.DestroyTimeout = time.Minute
	cfg.NumWorkers = int64(runtime.NumCPU())

	return nil
}

// Valid returns error if config not valid.
func (cfg *Config) Valid() error {
	if cfg.NumWorkers == 0 {
		return fmt.Errorf("pool.NumWorkers must be set")
	}

	if cfg.AllocateTimeout == 0 {
		return fmt.Errorf("pool.AllocateTimeout must be set")
	}

	if cfg.DestroyTimeout == 0 {
		return fmt.Errorf("pool.DestroyTimeout must be set")
	}

	if cfg.ExecTTL == 0 {
		return fmt.Errorf("pool.ExecTTL must be set")
	}

	return nil
}