summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-03 23:38:03 +0300
committerWolfy-J <[email protected]>2019-05-03 23:38:03 +0300
commit6d5b883247f0eb621ca51da791664a28c8539a52 (patch)
tree00ebb5e92799c60ac4050f634b0773969384f3ae
parent4e4f0bfb8be5d26772060bc77256cea7cbf68138 (diff)
watching
-rw-r--r--cmd/util/debug.go18
-rw-r--r--service/watcher/config.go9
-rw-r--r--service/watcher/state_watch.go58
-rw-r--r--service/watcher/watcher.go124
4 files changed, 161 insertions, 48 deletions
diff --git a/cmd/util/debug.go b/cmd/util/debug.go
index 01082112..c120eba2 100644
--- a/cmd/util/debug.go
+++ b/cmd/util/debug.go
@@ -69,6 +69,15 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool {
))
return true
+ case watcher.EventMaxIdleTTL:
+ w := ctx.(roadrunner.WorkerError)
+ logger.Debug(Sprintf(
+ "<white+hb>worker.%v</reset> <yellow>%s</reset>",
+ *w.Worker.Pid,
+ w.Caused,
+ ))
+ return true
+
case watcher.EventMaxMemory:
w := ctx.(roadrunner.WorkerError)
logger.Error(Sprintf(
@@ -77,6 +86,15 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool {
w.Caused,
))
return true
+
+ case watcher.EventMaxExecTTL:
+ w := ctx.(roadrunner.WorkerError)
+ logger.Error(Sprintf(
+ "<white+hb>worker.%v</reset> <red>%s</reset>",
+ *w.Worker.Pid,
+ w.Caused,
+ ))
+ return true
}
return false
diff --git a/service/watcher/config.go b/service/watcher/config.go
index dcd31777..74be517a 100644
--- a/service/watcher/config.go
+++ b/service/watcher/config.go
@@ -1,7 +1,6 @@
package watcher
import (
- "fmt"
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/service"
"time"
@@ -27,12 +26,6 @@ func (c *Config) Hydrate(cfg service.Config) error {
c.Interval = time.Second * time.Duration(c.Interval.Nanoseconds())
}
- for name, cfg := range c.Services {
- if err := cfg.Normalize(); err != nil {
- return fmt.Errorf("invalid watcher `%s`: %s", name, err.Error())
- }
- }
-
return nil
}
@@ -48,7 +41,7 @@ func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) {
watchers = make(map[string]roadrunner.Watcher)
for name, cfg := range c.Services {
- watchers[name] = &watcher{lsn: l, interval: c.Interval, cfg: cfg}
+ watchers[name] = &watcher{lsn: l, tick: c.Interval, cfg: cfg}
}
return watchers
diff --git a/service/watcher/state_watch.go b/service/watcher/state_watch.go
new file mode 100644
index 00000000..3090d15d
--- /dev/null
+++ b/service/watcher/state_watch.go
@@ -0,0 +1,58 @@
+package watcher
+
+import (
+ "github.com/spiral/roadrunner"
+ "time"
+)
+
+type stateWatcher struct {
+ prev map[*roadrunner.Worker]state
+ next map[*roadrunner.Worker]state
+}
+
+type state struct {
+ state int64
+ numExecs int64
+ since time.Time
+}
+
+func newStateWatcher() *stateWatcher {
+ return &stateWatcher{
+ prev: make(map[*roadrunner.Worker]state),
+ next: make(map[*roadrunner.Worker]state),
+ }
+}
+
+// add new worker to be watched
+func (sw *stateWatcher) push(w *roadrunner.Worker) {
+ sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()}
+}
+
+// update worker states.
+func (sw *stateWatcher) sync(t time.Time) {
+ for w := range sw.prev {
+ if _, ok := sw.next[w]; !ok {
+ delete(sw.prev, w)
+ }
+ }
+
+ for w, s := range sw.next {
+ ps, ok := sw.prev[w]
+ if !ok || ps.state != s.state || ps.numExecs != s.numExecs {
+ sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t}
+ }
+
+ delete(sw.next, w)
+ }
+}
+
+// find all workers which spend given amount of time in a specific state.
+func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) {
+ for w, s := range sw.prev {
+ if s.state == state && s.since.Before(since) {
+ workers = append(workers, w)
+ }
+ }
+
+ return
+}
diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go
index 63dce3d5..08d477fa 100644
--- a/service/watcher/watcher.go
+++ b/service/watcher/watcher.go
@@ -8,11 +8,17 @@ import (
)
const (
- // EventMaxTTL thrown when worker is removed due MaxTTL being reached. Context is roadrunner.WorkerError
- EventMaxTTL = iota + 8000
-
// EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory
+ EventMaxMemory = iota + 8000
+
+ // EventMaxTTL thrown when worker is removed due TTL being reached. Context is roadrunner.WorkerError
+ EventMaxTTL
+
+ // EventMaxIdleTTL triggered when worker spends too much time at rest.
+ EventMaxIdleTTL
+
+ // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventMaxExecTTL
)
// handles watcher events
@@ -20,76 +26,114 @@ type listener func(event int, ctx interface{})
// defines the watcher behaviour
type watcherConfig struct {
- // MaxTTL defines maximum time worker is allowed to live.
- MaxTTL time.Duration
-
// MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
MaxMemory uint64
-}
-// Normalize watcher config and upscale the durations.
-func (c *watcherConfig) Normalize() error {
- // Always use second based definition for time durations
- if c.MaxTTL < time.Microsecond {
- c.MaxTTL = time.Second * time.Duration(c.MaxTTL.Nanoseconds())
- }
+ // TTL defines maximum time worker is allowed to live.
+ TTL int64
+
+ // MaxIdleTTL defines maximum duration worker can spend in idle mode.
+ MaxIdleTTL int64
- return nil
+ // MaxExecTTL defines maximum lifetime per job.
+ MaxExecTTL int64
}
type watcher struct {
- lsn listener
- interval time.Duration
- cfg *watcherConfig
- stop chan interface{}
+ lsn listener
+ tick time.Duration
+ cfg *watcherConfig
// list of workers which are currently working
- //working map[*roadrunner.Worker]time.Time
+ sw *stateWatcher
+
+ stop chan interface{}
}
// watch the pool state
-func (watch *watcher) watch(p roadrunner.Pool) {
+func (wch *watcher) watch(p roadrunner.Pool) {
now := time.Now()
+
for _, w := range p.Workers() {
- if watch.cfg.MaxTTL != 0 && now.Sub(w.Created) >= watch.cfg.MaxTTL {
- err := fmt.Errorf("max TTL reached (%s)", watch.cfg.MaxTTL)
+ if w.State().Value() == roadrunner.StateInvalid {
+ // skip duplicate assessment
+ continue
+ }
+
+ s, err := util.WorkerState(w)
+ if err != nil {
+ continue
+ }
+
+ if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) {
+ err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL)
if p.Remove(w, err) {
- watch.report(EventMaxTTL, w, err)
+ wch.report(EventMaxTTL, w, err)
}
+ continue
}
- state, err := util.WorkerState(w)
- if err != nil {
+ if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 {
+ err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory)
+ if p.Remove(w, err) {
+ wch.report(EventMaxMemory, w, err)
+ }
continue
}
- if watch.cfg.MaxMemory != 0 && state.MemoryUsage >= watch.cfg.MaxMemory*1024*1024 {
- err := fmt.Errorf("max allowed memory reached (%vMB)", watch.cfg.MaxMemory)
+ // watch the worker state changes
+ wch.sw.push(w)
+ }
+
+ wch.sw.sync(now)
+
+ if wch.cfg.MaxExecTTL != 0 {
+ for _, w := range wch.sw.find(
+ roadrunner.StateWorking,
+ now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)),
+ ) {
+ err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL)
+ if p.Remove(w, err) {
+ // brutally
+ go w.Kill()
+ wch.report(EventMaxExecTTL, w, err)
+ }
+ }
+ }
+
+ // locale workers which are in idle mode for too long
+ if wch.cfg.MaxIdleTTL != 0 {
+ for _, w := range wch.sw.find(
+ roadrunner.StateReady,
+ now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)),
+ ) {
+ err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL)
if p.Remove(w, err) {
- watch.report(EventMaxMemory, w, err)
+ wch.report(EventMaxIdleTTL, w, err)
}
}
}
}
// throw watcher event
-func (watch *watcher) report(event int, worker *roadrunner.Worker, caused error) {
- if watch.lsn != nil {
- watch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
+func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) {
+ if wch.lsn != nil {
+ wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
}
}
// Attach watcher to the pool
-func (watch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
+func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
wp := &watcher{
- interval: watch.interval,
- lsn: watch.lsn,
- cfg: watch.cfg,
- stop: make(chan interface{}),
+ tick: wch.tick,
+ lsn: wch.lsn,
+ cfg: wch.cfg,
+ sw: newStateWatcher(),
+ stop: make(chan interface{}),
}
go func(wp *watcher, pool roadrunner.Pool) {
- ticker := time.NewTicker(wp.interval)
+ ticker := time.NewTicker(wp.tick)
for {
select {
case <-ticker.C:
@@ -104,6 +148,6 @@ func (watch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
}
// Detach watcher from the pool.
-func (watch *watcher) Detach() {
- close(watch.stop)
+func (wch *watcher) Detach() {
+ close(wch.stop)
}