summaryrefslogtreecommitdiff
path: root/service/limit/controller.go
blob: 24a158f7b6dd7f179f5eea39d97646398f45edef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package limit

import (
	"fmt"
	"github.com/spiral/roadrunner"
	"github.com/spiral/roadrunner/util"
	"time"
)

const (
	// EventMaxMemory caused when worker consumes more memory than allowed.
	EventMaxMemory = iota + 8000

	// EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
	EventTTL

	// EventIdleTTL triggered when worker spends too much time at rest.
	EventIdleTTL

	// EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
	EventExecTTL
)

// handles controller events
type listener func(event int, ctx interface{})

// defines the controller behaviour
type controllerConfig struct {
	// MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
	MaxMemory uint64

	// TTL defines maximum time worker is allowed to live.
	TTL int64

	// IdleTTL defines maximum duration worker can spend in idle mode.
	IdleTTL int64

	// ExecTTL defines maximum lifetime per job.
	ExecTTL int64
}

type controller struct {
	lsn  listener
	tick time.Duration
	cfg  *controllerConfig

	// list of workers which are currently working
	sw *stateFilter

	stop chan interface{}
}

// control the pool state
func (c *controller) control(p roadrunner.Pool) {
	c.loadWorkers(p)

	now := time.Now()

	if c.cfg.ExecTTL != 0 {
		for _, w := range c.sw.find(
			roadrunner.StateWorking,
			now.Add(-time.Second*time.Duration(c.cfg.ExecTTL)),
		) {
			eID := w.State().NumExecs()
			err := fmt.Errorf("max exec time reached (%vs)", c.cfg.ExecTTL)

			// make sure worker still on initial request
			if p.Remove(w, err) && w.State().NumExecs() == eID {
				go func() {
					err := w.Kill()
					if err != nil {
						fmt.Printf("error killing worker with PID number: %d, created: %s", w.Pid, w.Created)
					}
				}()
				c.report(EventExecTTL, w, err)
			}
		}
	}

	// locale workers which are in idle mode for too long
	if c.cfg.IdleTTL != 0 {
		for _, w := range c.sw.find(
			roadrunner.StateReady,
			now.Add(-time.Second*time.Duration(c.cfg.IdleTTL)),
		) {
			err := fmt.Errorf("max idle time reached (%vs)", c.cfg.IdleTTL)
			if p.Remove(w, err) {
				c.report(EventIdleTTL, w, err)
			}
		}
	}
}

func (c *controller) loadWorkers(p roadrunner.Pool) {
	now := time.Now()

	for _, w := range p.Workers() {
		if w.State().Value() == roadrunner.StateInvalid {
			// skip duplicate assessment
			continue
		}

		s, err := util.WorkerState(w)
		if err != nil {
			continue
		}

		if c.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(c.cfg.TTL) {
			err := fmt.Errorf("max TTL reached (%vs)", c.cfg.TTL)
			if p.Remove(w, err) {
				c.report(EventTTL, w, err)
			}
			continue
		}

		if c.cfg.MaxMemory != 0 && s.MemoryUsage >= c.cfg.MaxMemory*1024*1024 {
			err := fmt.Errorf("max allowed memory reached (%vMB)", c.cfg.MaxMemory)
			if p.Remove(w, err) {
				c.report(EventMaxMemory, w, err)
			}
			continue
		}

		// control the worker state changes
		c.sw.push(w)
	}

	c.sw.sync(now)
}

// throw controller event
func (c *controller) report(event int, worker *roadrunner.Worker, caused error) {
	if c.lsn != nil {
		c.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
	}
}

// Attach controller to the pool
func (c *controller) Attach(pool roadrunner.Pool) roadrunner.Controller {
	wp := &controller{
		tick: c.tick,
		lsn:  c.lsn,
		cfg:  c.cfg,
		sw:   newStateFilter(),
		stop: make(chan interface{}),
	}

	go func(wp *controller, pool roadrunner.Pool) {
		ticker := time.NewTicker(wp.tick)
		for {
			select {
			case <-ticker.C:
				wp.control(pool)
			case <-wp.stop:
				return
			}
		}
	}(wp, pool)

	return wp
}

// Detach controller from the pool.
func (c *controller) Detach() {
	close(c.stop)
}