summaryrefslogtreecommitdiff
path: root/service/limit/state_filter.go
blob: cd2eca94d05aecdc7f0e6aa701590cae85e7a977 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package limit

import (
	"github.com/spiral/roadrunner"
	"time"
)

type stateFilter struct {
	prev map[*roadrunner.Worker]state
	next map[*roadrunner.Worker]state
}

type state struct {
	state    int64
	numExecs int64
	since    time.Time
}

func newStateFilter() *stateFilter {
	return &stateFilter{
		prev: make(map[*roadrunner.Worker]state),
		next: make(map[*roadrunner.Worker]state),
	}
}

// add new worker to be watched
func (sw *stateFilter) push(w *roadrunner.Worker) {
	sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()}
}

// update worker states.
func (sw *stateFilter) sync(t time.Time) {
	for w := range sw.prev {
		if _, ok := sw.next[w]; !ok {
			delete(sw.prev, w)
		}
	}

	for w, s := range sw.next {
		ps, ok := sw.prev[w]
		if !ok || ps.state != s.state || ps.numExecs != s.numExecs {
			sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t}
		}

		delete(sw.next, w)
	}
}

// find all workers which spend given amount of time in a specific state.
func (sw *stateFilter) find(state int64, since time.Time) (workers []*roadrunner.Worker) {
	for w, s := range sw.prev {
		if s.state == state && s.since.Before(since) {
			workers = append(workers, w)
		}
	}

	return
}