summaryrefslogtreecommitdiff
path: root/service/watcher/watcher.go
blob: 65a2eeebb0cb9c8b08981cd6fc7e49683825b7b4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package watcher

import (
	"fmt"
	"github.com/spiral/roadrunner"
	"github.com/spiral/roadrunner/util"
	"time"
)

const (
	// EventMaxMemory caused when worker consumes more memory than allowed.
	EventMaxMemory = iota + 8000

	// EventMaxTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
	EventMaxTTL

	// EventMaxIdleTTL triggered when worker spends too much time at rest.
	EventMaxIdleTTL

	// EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time).
	EventMaxExecTTL
)

// handles watcher events
type listener func(event int, ctx interface{})

// defines the watcher behaviour
type watcherConfig struct {
	// MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
	MaxMemory uint64

	// TTL defines maximum time worker is allowed to live.
	TTL int64

	// MaxIdleTTL defines maximum duration worker can spend in idle mode.
	MaxIdleTTL int64

	// MaxExecTTL defines maximum lifetime per job.
	MaxExecTTL int64
}

type watcher struct {
	lsn  listener
	tick time.Duration
	cfg  *watcherConfig

	// list of workers which are currently working
	sw *stateWatcher

	stop chan interface{}
}

// watch the pool state
func (wch *watcher) watch(p roadrunner.Pool) {
	wch.loadWorkers(p)

	now := time.Now()

	if wch.cfg.MaxExecTTL != 0 {
		for _, w := range wch.sw.find(
			roadrunner.StateWorking,
			now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)),
		) {
			eID := w.State().NumExecs()
			err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL)

			// make sure worker still on initial request
			if p.Remove(w, err) && w.State().NumExecs() == eID {
				go w.Kill()
				wch.report(EventMaxExecTTL, w, err)
			}
		}
	}

	// locale workers which are in idle mode for too long
	if wch.cfg.MaxIdleTTL != 0 {
		for _, w := range wch.sw.find(
			roadrunner.StateReady,
			now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)),
		) {
			err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL)
			if p.Remove(w, err) {
				wch.report(EventMaxIdleTTL, w, err)
			}
		}
	}
}

func (wch *watcher) loadWorkers(p roadrunner.Pool) {
	now := time.Now()

	for _, w := range p.Workers() {
		if w.State().Value() == roadrunner.StateInvalid {
			// skip duplicate assessment
			continue
		}

		s, err := util.WorkerState(w)
		if err != nil {
			continue
		}

		if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) {
			err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL)
			if p.Remove(w, err) {
				wch.report(EventMaxTTL, w, err)
			}
			continue
		}

		if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 {
			err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory)
			if p.Remove(w, err) {
				wch.report(EventMaxMemory, w, err)
			}
			continue
		}

		// watch the worker state changes
		wch.sw.push(w)
	}

	wch.sw.sync(now)
}

// throw watcher event
func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) {
	if wch.lsn != nil {
		wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
	}
}

// Attach watcher to the pool
func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
	wp := &watcher{
		tick: wch.tick,
		lsn:  wch.lsn,
		cfg:  wch.cfg,
		sw:   newStateWatcher(),
		stop: make(chan interface{}),
	}

	go func(wp *watcher, pool roadrunner.Pool) {
		ticker := time.NewTicker(wp.tick)
		for {
			select {
			case <-ticker.C:
				wp.watch(pool)
			case <-wp.stop:
				return
			}
		}
	}(wp, pool)

	return wp
}

// Detach watcher from the pool.
func (wch *watcher) Detach() {
	close(wch.stop)
}