summaryrefslogtreecommitdiff
path: root/service/watcher/watcher.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-04 19:24:25 +0300
committerWolfy-J <[email protected]>2019-05-04 19:24:25 +0300
commit2efc533f2aac215d487a80020b0f9bf4ae5209c3 (patch)
treea80a7a74dc4ca8c290b8b1bf1f6d24535b5ae3d7 /service/watcher/watcher.go
parent726b31008e73ab83d0582305c28a8cf62322e47a (diff)
watchers renamed to controllers
Diffstat (limited to 'service/watcher/watcher.go')
-rw-r--r--service/watcher/watcher.go161
1 files changed, 0 insertions, 161 deletions
diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go
deleted file mode 100644
index 65a2eeeb..00000000
--- a/service/watcher/watcher.go
+++ /dev/null
@@ -1,161 +0,0 @@
-package watcher
-
-import (
- "fmt"
- "github.com/spiral/roadrunner"
- "github.com/spiral/roadrunner/util"
- "time"
-)
-
-const (
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory = iota + 8000
-
- // EventMaxTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
- EventMaxTTL
-
- // EventMaxIdleTTL triggered when worker spends too much time at rest.
- EventMaxIdleTTL
-
- // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time).
- EventMaxExecTTL
-)
-
-// handles watcher events
-type listener func(event int, ctx interface{})
-
-// defines the watcher behaviour
-type watcherConfig struct {
- // MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
- MaxMemory uint64
-
- // TTL defines maximum time worker is allowed to live.
- TTL int64
-
- // MaxIdleTTL defines maximum duration worker can spend in idle mode.
- MaxIdleTTL int64
-
- // MaxExecTTL defines maximum lifetime per job.
- MaxExecTTL int64
-}
-
-type watcher struct {
- lsn listener
- tick time.Duration
- cfg *watcherConfig
-
- // list of workers which are currently working
- sw *stateWatcher
-
- stop chan interface{}
-}
-
-// watch the pool state
-func (wch *watcher) watch(p roadrunner.Pool) {
- wch.loadWorkers(p)
-
- now := time.Now()
-
- if wch.cfg.MaxExecTTL != 0 {
- for _, w := range wch.sw.find(
- roadrunner.StateWorking,
- now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)),
- ) {
- eID := w.State().NumExecs()
- err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL)
-
- // make sure worker still on initial request
- if p.Remove(w, err) && w.State().NumExecs() == eID {
- go w.Kill()
- wch.report(EventMaxExecTTL, w, err)
- }
- }
- }
-
- // locale workers which are in idle mode for too long
- if wch.cfg.MaxIdleTTL != 0 {
- for _, w := range wch.sw.find(
- roadrunner.StateReady,
- now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)),
- ) {
- err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL)
- if p.Remove(w, err) {
- wch.report(EventMaxIdleTTL, w, err)
- }
- }
- }
-}
-
-func (wch *watcher) loadWorkers(p roadrunner.Pool) {
- now := time.Now()
-
- for _, w := range p.Workers() {
- if w.State().Value() == roadrunner.StateInvalid {
- // skip duplicate assessment
- continue
- }
-
- s, err := util.WorkerState(w)
- if err != nil {
- continue
- }
-
- if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) {
- err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL)
- if p.Remove(w, err) {
- wch.report(EventMaxTTL, w, err)
- }
- continue
- }
-
- if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 {
- err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory)
- if p.Remove(w, err) {
- wch.report(EventMaxMemory, w, err)
- }
- continue
- }
-
- // watch the worker state changes
- wch.sw.push(w)
- }
-
- wch.sw.sync(now)
-}
-
-// throw watcher event
-func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) {
- if wch.lsn != nil {
- wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
- }
-}
-
-// Attach watcher to the pool
-func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
- wp := &watcher{
- tick: wch.tick,
- lsn: wch.lsn,
- cfg: wch.cfg,
- sw: newStateWatcher(),
- stop: make(chan interface{}),
- }
-
- go func(wp *watcher, pool roadrunner.Pool) {
- ticker := time.NewTicker(wp.tick)
- for {
- select {
- case <-ticker.C:
- wp.watch(pool)
- case <-wp.stop:
- return
- }
- }
- }(wp, pool)
-
- return wp
-}
-
-// Detach watcher from the pool.
-func (wch *watcher) Detach() {
- close(wch.stop)
-}