1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
package watcher
import (
"fmt"
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/util"
"time"
)
const (
// EventMaxMemory caused when worker consumes more memory than allowed.
EventMaxMemory = iota + 8000
// EventMaxTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
EventMaxTTL
// EventMaxIdleTTL triggered when worker spends too much time at rest.
EventMaxIdleTTL
// EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time).
EventMaxExecTTL
)
// handles controller events
type listener func(event int, ctx interface{})
// defines the controller behaviour
type controllerConfig struct {
// MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
MaxMemory uint64
// TTL defines maximum time worker is allowed to live.
TTL int64
// MaxIdleTTL defines maximum duration worker can spend in idle mode.
MaxIdleTTL int64
// MaxExecTTL defines maximum lifetime per job.
MaxExecTTL int64
}
type controller struct {
lsn listener
tick time.Duration
cfg *controllerConfig
// list of workers which are currently working
sw *stateFilter
stop chan interface{}
}
// control the pool state
func (c *controller) control(p roadrunner.Pool) {
c.loadWorkers(p)
now := time.Now()
if c.cfg.MaxExecTTL != 0 {
for _, w := range c.sw.find(
roadrunner.StateWorking,
now.Add(-time.Second*time.Duration(c.cfg.MaxExecTTL)),
) {
eID := w.State().NumExecs()
err := fmt.Errorf("max exec time reached (%vs)", c.cfg.MaxExecTTL)
// make sure worker still on initial request
if p.Remove(w, err) && w.State().NumExecs() == eID {
go w.Kill()
c.report(EventMaxExecTTL, w, err)
}
}
}
// locale workers which are in idle mode for too long
if c.cfg.MaxIdleTTL != 0 {
for _, w := range c.sw.find(
roadrunner.StateReady,
now.Add(-time.Second*time.Duration(c.cfg.MaxIdleTTL)),
) {
err := fmt.Errorf("max idle time reached (%vs)", c.cfg.MaxIdleTTL)
if p.Remove(w, err) {
c.report(EventMaxIdleTTL, w, err)
}
}
}
}
func (c *controller) loadWorkers(p roadrunner.Pool) {
now := time.Now()
for _, w := range p.Workers() {
if w.State().Value() == roadrunner.StateInvalid {
// skip duplicate assessment
continue
}
s, err := util.WorkerState(w)
if err != nil {
continue
}
if c.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(c.cfg.TTL) {
err := fmt.Errorf("max TTL reached (%vs)", c.cfg.TTL)
if p.Remove(w, err) {
c.report(EventMaxTTL, w, err)
}
continue
}
if c.cfg.MaxMemory != 0 && s.MemoryUsage >= c.cfg.MaxMemory*1024*1024 {
err := fmt.Errorf("max allowed memory reached (%vMB)", c.cfg.MaxMemory)
if p.Remove(w, err) {
c.report(EventMaxMemory, w, err)
}
continue
}
// control the worker state changes
c.sw.push(w)
}
c.sw.sync(now)
}
// throw controller event
func (c *controller) report(event int, worker *roadrunner.Worker, caused error) {
if c.lsn != nil {
c.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
}
}
// Attach controller to the pool
func (c *controller) Attach(pool roadrunner.Pool) roadrunner.Controller {
wp := &controller{
tick: c.tick,
lsn: c.lsn,
cfg: c.cfg,
sw: newStateFilter(),
stop: make(chan interface{}),
}
go func(wp *controller, pool roadrunner.Pool) {
ticker := time.NewTicker(wp.tick)
for {
select {
case <-ticker.C:
wp.control(pool)
case <-wp.stop:
return
}
}
}(wp, pool)
return wp
}
// Detach controller from the pool.
func (c *controller) Detach() {
close(c.stop)
}
|