summaryrefslogtreecommitdiff
path: root/service/watcher/state_watch.go
diff options
context:
space:
mode:
Diffstat (limited to 'service/watcher/state_watch.go')
-rw-r--r--service/watcher/state_watch.go58
1 files changed, 58 insertions, 0 deletions
diff --git a/service/watcher/state_watch.go b/service/watcher/state_watch.go
new file mode 100644
index 00000000..3090d15d
--- /dev/null
+++ b/service/watcher/state_watch.go
@@ -0,0 +1,58 @@
+package watcher
+
+import (
+ "github.com/spiral/roadrunner"
+ "time"
+)
+
+type stateWatcher struct {
+ prev map[*roadrunner.Worker]state
+ next map[*roadrunner.Worker]state
+}
+
+type state struct {
+ state int64
+ numExecs int64
+ since time.Time
+}
+
+func newStateWatcher() *stateWatcher {
+ return &stateWatcher{
+ prev: make(map[*roadrunner.Worker]state),
+ next: make(map[*roadrunner.Worker]state),
+ }
+}
+
+// add new worker to be watched
+func (sw *stateWatcher) push(w *roadrunner.Worker) {
+ sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()}
+}
+
+// update worker states.
+func (sw *stateWatcher) sync(t time.Time) {
+ for w := range sw.prev {
+ if _, ok := sw.next[w]; !ok {
+ delete(sw.prev, w)
+ }
+ }
+
+ for w, s := range sw.next {
+ ps, ok := sw.prev[w]
+ if !ok || ps.state != s.state || ps.numExecs != s.numExecs {
+ sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t}
+ }
+
+ delete(sw.next, w)
+ }
+}
+
+// find all workers which spend given amount of time in a specific state.
+func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) {
+ for w, s := range sw.prev {
+ if s.state == state && s.since.Before(since) {
+ workers = append(workers, w)
+ }
+ }
+
+ return
+}