diff options
author | Wolfy-J <[email protected]> | 2019-05-04 19:24:25 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2019-05-04 19:24:25 +0300 |
commit | 2efc533f2aac215d487a80020b0f9bf4ae5209c3 (patch) | |
tree | a80a7a74dc4ca8c290b8b1bf1f6d24535b5ae3d7 /service/watcher/watcher.go | |
parent | 726b31008e73ab83d0582305c28a8cf62322e47a (diff) |
watchers renamed to controllers
Diffstat (limited to 'service/watcher/watcher.go')
-rw-r--r-- | service/watcher/watcher.go | 161 |
1 files changed, 0 insertions, 161 deletions
diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go deleted file mode 100644 index 65a2eeeb..00000000 --- a/service/watcher/watcher.go +++ /dev/null @@ -1,161 +0,0 @@ -package watcher - -import ( - "fmt" - "github.com/spiral/roadrunner" - "github.com/spiral/roadrunner/util" - "time" -) - -const ( - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory = iota + 8000 - - // EventMaxTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError - EventMaxTTL - - // EventMaxIdleTTL triggered when worker spends too much time at rest. - EventMaxIdleTTL - - // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time). - EventMaxExecTTL -) - -// handles watcher events -type listener func(event int, ctx interface{}) - -// defines the watcher behaviour -type watcherConfig struct { - // MaxMemory defines maximum amount of memory allowed for worker. In megabytes. - MaxMemory uint64 - - // TTL defines maximum time worker is allowed to live. - TTL int64 - - // MaxIdleTTL defines maximum duration worker can spend in idle mode. - MaxIdleTTL int64 - - // MaxExecTTL defines maximum lifetime per job. - MaxExecTTL int64 -} - -type watcher struct { - lsn listener - tick time.Duration - cfg *watcherConfig - - // list of workers which are currently working - sw *stateWatcher - - stop chan interface{} -} - -// watch the pool state -func (wch *watcher) watch(p roadrunner.Pool) { - wch.loadWorkers(p) - - now := time.Now() - - if wch.cfg.MaxExecTTL != 0 { - for _, w := range wch.sw.find( - roadrunner.StateWorking, - now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)), - ) { - eID := w.State().NumExecs() - err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL) - - // make sure worker still on initial request - if p.Remove(w, err) && w.State().NumExecs() == eID { - go w.Kill() - wch.report(EventMaxExecTTL, w, err) - } - } - } - - // locale workers which are in idle mode for too long - if wch.cfg.MaxIdleTTL != 0 { - for _, w := range wch.sw.find( - roadrunner.StateReady, - now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)), - ) { - err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL) - if p.Remove(w, err) { - wch.report(EventMaxIdleTTL, w, err) - } - } - } -} - -func (wch *watcher) loadWorkers(p roadrunner.Pool) { - now := time.Now() - - for _, w := range p.Workers() { - if w.State().Value() == roadrunner.StateInvalid { - // skip duplicate assessment - continue - } - - s, err := util.WorkerState(w) - if err != nil { - continue - } - - if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) { - err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL) - if p.Remove(w, err) { - wch.report(EventMaxTTL, w, err) - } - continue - } - - if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 { - err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory) - if p.Remove(w, err) { - wch.report(EventMaxMemory, w, err) - } - continue - } - - // watch the worker state changes - wch.sw.push(w) - } - - wch.sw.sync(now) -} - -// throw watcher event -func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) { - if wch.lsn != nil { - wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) - } -} - -// Attach watcher to the pool -func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher { - wp := &watcher{ - tick: wch.tick, - lsn: wch.lsn, - cfg: wch.cfg, - sw: newStateWatcher(), - stop: make(chan interface{}), - } - - go func(wp *watcher, pool roadrunner.Pool) { - ticker := time.NewTicker(wp.tick) - for { - select { - case <-ticker.C: - wp.watch(pool) - case <-wp.stop: - return - } - } - }(wp, pool) - - return wp -} - -// Detach watcher from the pool. -func (wch *watcher) Detach() { - close(wch.stop) -} |