summaryrefslogtreecommitdiff
path: root/service/watcher/state_watch.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-03 23:38:03 +0300
committerWolfy-J <[email protected]>2019-05-03 23:38:03 +0300
commit6d5b883247f0eb621ca51da791664a28c8539a52 (patch)
tree00ebb5e92799c60ac4050f634b0773969384f3ae /service/watcher/state_watch.go
parent4e4f0bfb8be5d26772060bc77256cea7cbf68138 (diff)
watching
Diffstat (limited to 'service/watcher/state_watch.go')
-rw-r--r--service/watcher/state_watch.go58
1 files changed, 58 insertions, 0 deletions
diff --git a/service/watcher/state_watch.go b/service/watcher/state_watch.go
new file mode 100644
index 00000000..3090d15d
--- /dev/null
+++ b/service/watcher/state_watch.go
@@ -0,0 +1,58 @@
+package watcher
+
+import (
+ "github.com/spiral/roadrunner"
+ "time"
+)
+
+type stateWatcher struct {
+ prev map[*roadrunner.Worker]state
+ next map[*roadrunner.Worker]state
+}
+
+type state struct {
+ state int64
+ numExecs int64
+ since time.Time
+}
+
+func newStateWatcher() *stateWatcher {
+ return &stateWatcher{
+ prev: make(map[*roadrunner.Worker]state),
+ next: make(map[*roadrunner.Worker]state),
+ }
+}
+
+// add new worker to be watched
+func (sw *stateWatcher) push(w *roadrunner.Worker) {
+ sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()}
+}
+
+// update worker states.
+func (sw *stateWatcher) sync(t time.Time) {
+ for w := range sw.prev {
+ if _, ok := sw.next[w]; !ok {
+ delete(sw.prev, w)
+ }
+ }
+
+ for w, s := range sw.next {
+ ps, ok := sw.prev[w]
+ if !ok || ps.state != s.state || ps.numExecs != s.numExecs {
+ sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t}
+ }
+
+ delete(sw.next, w)
+ }
+}
+
+// find all workers which spend given amount of time in a specific state.
+func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) {
+ for w, s := range sw.prev {
+ if s.state == state && s.since.Before(since) {
+ workers = append(workers, w)
+ }
+ }
+
+ return
+}