diff options
author | Wolfy-J <[email protected]> | 2019-05-03 23:38:03 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2019-05-03 23:38:03 +0300 |
commit | 6d5b883247f0eb621ca51da791664a28c8539a52 (patch) | |
tree | 00ebb5e92799c60ac4050f634b0773969384f3ae | |
parent | 4e4f0bfb8be5d26772060bc77256cea7cbf68138 (diff) |
watching
-rw-r--r-- | cmd/util/debug.go | 18 | ||||
-rw-r--r-- | service/watcher/config.go | 9 | ||||
-rw-r--r-- | service/watcher/state_watch.go | 58 | ||||
-rw-r--r-- | service/watcher/watcher.go | 124 |
4 files changed, 161 insertions, 48 deletions
diff --git a/cmd/util/debug.go b/cmd/util/debug.go index 01082112..c120eba2 100644 --- a/cmd/util/debug.go +++ b/cmd/util/debug.go @@ -69,6 +69,15 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool { )) return true + case watcher.EventMaxIdleTTL: + w := ctx.(roadrunner.WorkerError) + logger.Debug(Sprintf( + "<white+hb>worker.%v</reset> <yellow>%s</reset>", + *w.Worker.Pid, + w.Caused, + )) + return true + case watcher.EventMaxMemory: w := ctx.(roadrunner.WorkerError) logger.Error(Sprintf( @@ -77,6 +86,15 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool { w.Caused, )) return true + + case watcher.EventMaxExecTTL: + w := ctx.(roadrunner.WorkerError) + logger.Error(Sprintf( + "<white+hb>worker.%v</reset> <red>%s</reset>", + *w.Worker.Pid, + w.Caused, + )) + return true } return false diff --git a/service/watcher/config.go b/service/watcher/config.go index dcd31777..74be517a 100644 --- a/service/watcher/config.go +++ b/service/watcher/config.go @@ -1,7 +1,6 @@ package watcher import ( - "fmt" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service" "time" @@ -27,12 +26,6 @@ func (c *Config) Hydrate(cfg service.Config) error { c.Interval = time.Second * time.Duration(c.Interval.Nanoseconds()) } - for name, cfg := range c.Services { - if err := cfg.Normalize(); err != nil { - return fmt.Errorf("invalid watcher `%s`: %s", name, err.Error()) - } - } - return nil } @@ -48,7 +41,7 @@ func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) { watchers = make(map[string]roadrunner.Watcher) for name, cfg := range c.Services { - watchers[name] = &watcher{lsn: l, interval: c.Interval, cfg: cfg} + watchers[name] = &watcher{lsn: l, tick: c.Interval, cfg: cfg} } return watchers diff --git a/service/watcher/state_watch.go b/service/watcher/state_watch.go new file mode 100644 index 00000000..3090d15d --- /dev/null +++ b/service/watcher/state_watch.go @@ -0,0 +1,58 @@ +package watcher + +import ( + "github.com/spiral/roadrunner" + "time" +) + +type stateWatcher struct { + prev map[*roadrunner.Worker]state + next map[*roadrunner.Worker]state +} + +type state struct { + state int64 + numExecs int64 + since time.Time +} + +func newStateWatcher() *stateWatcher { + return &stateWatcher{ + prev: make(map[*roadrunner.Worker]state), + next: make(map[*roadrunner.Worker]state), + } +} + +// add new worker to be watched +func (sw *stateWatcher) push(w *roadrunner.Worker) { + sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()} +} + +// update worker states. +func (sw *stateWatcher) sync(t time.Time) { + for w := range sw.prev { + if _, ok := sw.next[w]; !ok { + delete(sw.prev, w) + } + } + + for w, s := range sw.next { + ps, ok := sw.prev[w] + if !ok || ps.state != s.state || ps.numExecs != s.numExecs { + sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t} + } + + delete(sw.next, w) + } +} + +// find all workers which spend given amount of time in a specific state. +func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) { + for w, s := range sw.prev { + if s.state == state && s.since.Before(since) { + workers = append(workers, w) + } + } + + return +} diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go index 63dce3d5..08d477fa 100644 --- a/service/watcher/watcher.go +++ b/service/watcher/watcher.go @@ -8,11 +8,17 @@ import ( ) const ( - // EventMaxTTL thrown when worker is removed due MaxTTL being reached. Context is roadrunner.WorkerError - EventMaxTTL = iota + 8000 - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory + EventMaxMemory = iota + 8000 + + // EventMaxTTL thrown when worker is removed due TTL being reached. Context is roadrunner.WorkerError + EventMaxTTL + + // EventMaxIdleTTL triggered when worker spends too much time at rest. + EventMaxIdleTTL + + // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time). + EventMaxExecTTL ) // handles watcher events @@ -20,76 +26,114 @@ type listener func(event int, ctx interface{}) // defines the watcher behaviour type watcherConfig struct { - // MaxTTL defines maximum time worker is allowed to live. - MaxTTL time.Duration - // MaxMemory defines maximum amount of memory allowed for worker. In megabytes. MaxMemory uint64 -} -// Normalize watcher config and upscale the durations. -func (c *watcherConfig) Normalize() error { - // Always use second based definition for time durations - if c.MaxTTL < time.Microsecond { - c.MaxTTL = time.Second * time.Duration(c.MaxTTL.Nanoseconds()) - } + // TTL defines maximum time worker is allowed to live. + TTL int64 + + // MaxIdleTTL defines maximum duration worker can spend in idle mode. + MaxIdleTTL int64 - return nil + // MaxExecTTL defines maximum lifetime per job. + MaxExecTTL int64 } type watcher struct { - lsn listener - interval time.Duration - cfg *watcherConfig - stop chan interface{} + lsn listener + tick time.Duration + cfg *watcherConfig // list of workers which are currently working - //working map[*roadrunner.Worker]time.Time + sw *stateWatcher + + stop chan interface{} } // watch the pool state -func (watch *watcher) watch(p roadrunner.Pool) { +func (wch *watcher) watch(p roadrunner.Pool) { now := time.Now() + for _, w := range p.Workers() { - if watch.cfg.MaxTTL != 0 && now.Sub(w.Created) >= watch.cfg.MaxTTL { - err := fmt.Errorf("max TTL reached (%s)", watch.cfg.MaxTTL) + if w.State().Value() == roadrunner.StateInvalid { + // skip duplicate assessment + continue + } + + s, err := util.WorkerState(w) + if err != nil { + continue + } + + if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) { + err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL) if p.Remove(w, err) { - watch.report(EventMaxTTL, w, err) + wch.report(EventMaxTTL, w, err) } + continue } - state, err := util.WorkerState(w) - if err != nil { + if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 { + err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory) + if p.Remove(w, err) { + wch.report(EventMaxMemory, w, err) + } continue } - if watch.cfg.MaxMemory != 0 && state.MemoryUsage >= watch.cfg.MaxMemory*1024*1024 { - err := fmt.Errorf("max allowed memory reached (%vMB)", watch.cfg.MaxMemory) + // watch the worker state changes + wch.sw.push(w) + } + + wch.sw.sync(now) + + if wch.cfg.MaxExecTTL != 0 { + for _, w := range wch.sw.find( + roadrunner.StateWorking, + now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)), + ) { + err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL) + if p.Remove(w, err) { + // brutally + go w.Kill() + wch.report(EventMaxExecTTL, w, err) + } + } + } + + // locale workers which are in idle mode for too long + if wch.cfg.MaxIdleTTL != 0 { + for _, w := range wch.sw.find( + roadrunner.StateReady, + now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)), + ) { + err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL) if p.Remove(w, err) { - watch.report(EventMaxMemory, w, err) + wch.report(EventMaxIdleTTL, w, err) } } } } // throw watcher event -func (watch *watcher) report(event int, worker *roadrunner.Worker, caused error) { - if watch.lsn != nil { - watch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) +func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) { + if wch.lsn != nil { + wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) } } // Attach watcher to the pool -func (watch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher { +func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher { wp := &watcher{ - interval: watch.interval, - lsn: watch.lsn, - cfg: watch.cfg, - stop: make(chan interface{}), + tick: wch.tick, + lsn: wch.lsn, + cfg: wch.cfg, + sw: newStateWatcher(), + stop: make(chan interface{}), } go func(wp *watcher, pool roadrunner.Pool) { - ticker := time.NewTicker(wp.interval) + ticker := time.NewTicker(wp.tick) for { select { case <-ticker.C: @@ -104,6 +148,6 @@ func (watch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher { } // Detach watcher from the pool. -func (watch *watcher) Detach() { - close(watch.stop) +func (wch *watcher) Detach() { + close(wch.stop) } |