summaryrefslogtreecommitdiff
path: root/service/watcher/watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'service/watcher/watcher.go')
-rw-r--r--service/watcher/watcher.go153
1 files changed, 153 insertions, 0 deletions
diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go
new file mode 100644
index 00000000..08d477fa
--- /dev/null
+++ b/service/watcher/watcher.go
@@ -0,0 +1,153 @@
+package watcher
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/util"
+ "time"
+)
+
+const (
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory = iota + 8000
+
+ // EventMaxTTL thrown when worker is removed due TTL being reached. Context is roadrunner.WorkerError
+ EventMaxTTL
+
+ // EventMaxIdleTTL triggered when worker spends too much time at rest.
+ EventMaxIdleTTL
+
+ // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventMaxExecTTL
+)
+
+// handles watcher events
+type listener func(event int, ctx interface{})
+
+// defines the watcher behaviour
+type watcherConfig struct {
+ // MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
+ MaxMemory uint64
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL int64
+
+ // MaxIdleTTL defines maximum duration worker can spend in idle mode.
+ MaxIdleTTL int64
+
+ // MaxExecTTL defines maximum lifetime per job.
+ MaxExecTTL int64
+}
+
+type watcher struct {
+ lsn listener
+ tick time.Duration
+ cfg *watcherConfig
+
+ // list of workers which are currently working
+ sw *stateWatcher
+
+ stop chan interface{}
+}
+
+// watch the pool state
+func (wch *watcher) watch(p roadrunner.Pool) {
+ now := time.Now()
+
+ for _, w := range p.Workers() {
+ if w.State().Value() == roadrunner.StateInvalid {
+ // skip duplicate assessment
+ continue
+ }
+
+ s, err := util.WorkerState(w)
+ if err != nil {
+ continue
+ }
+
+ if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) {
+ err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL)
+ if p.Remove(w, err) {
+ wch.report(EventMaxTTL, w, err)
+ }
+ continue
+ }
+
+ if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 {
+ err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory)
+ if p.Remove(w, err) {
+ wch.report(EventMaxMemory, w, err)
+ }
+ continue
+ }
+
+ // watch the worker state changes
+ wch.sw.push(w)
+ }
+
+ wch.sw.sync(now)
+
+ if wch.cfg.MaxExecTTL != 0 {
+ for _, w := range wch.sw.find(
+ roadrunner.StateWorking,
+ now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)),
+ ) {
+ err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL)
+ if p.Remove(w, err) {
+ // brutally
+ go w.Kill()
+ wch.report(EventMaxExecTTL, w, err)
+ }
+ }
+ }
+
+ // locale workers which are in idle mode for too long
+ if wch.cfg.MaxIdleTTL != 0 {
+ for _, w := range wch.sw.find(
+ roadrunner.StateReady,
+ now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)),
+ ) {
+ err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL)
+ if p.Remove(w, err) {
+ wch.report(EventMaxIdleTTL, w, err)
+ }
+ }
+ }
+}
+
+// throw watcher event
+func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) {
+ if wch.lsn != nil {
+ wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
+ }
+}
+
+// Attach watcher to the pool
+func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
+ wp := &watcher{
+ tick: wch.tick,
+ lsn: wch.lsn,
+ cfg: wch.cfg,
+ sw: newStateWatcher(),
+ stop: make(chan interface{}),
+ }
+
+ go func(wp *watcher, pool roadrunner.Pool) {
+ ticker := time.NewTicker(wp.tick)
+ for {
+ select {
+ case <-ticker.C:
+ wp.watch(pool)
+ case <-wp.stop:
+ return
+ }
+ }
+ }(wp, pool)
+
+ return wp
+}
+
+// Detach watcher from the pool.
+func (wch *watcher) Detach() {
+ close(wch.stop)
+}