summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--service/watcher/watcher.go66
-rw-r--r--watcher_test.go2
2 files changed, 36 insertions, 32 deletions
diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go
index 788ee703..65a2eeeb 100644
--- a/service/watcher/watcher.go
+++ b/service/watcher/watcher.go
@@ -52,6 +52,41 @@ type watcher struct {
// watch the pool state
func (wch *watcher) watch(p roadrunner.Pool) {
+ wch.loadWorkers(p)
+
+ now := time.Now()
+
+ if wch.cfg.MaxExecTTL != 0 {
+ for _, w := range wch.sw.find(
+ roadrunner.StateWorking,
+ now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)),
+ ) {
+ eID := w.State().NumExecs()
+ err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL)
+
+ // make sure worker still on initial request
+ if p.Remove(w, err) && w.State().NumExecs() == eID {
+ go w.Kill()
+ wch.report(EventMaxExecTTL, w, err)
+ }
+ }
+ }
+
+ // locale workers which are in idle mode for too long
+ if wch.cfg.MaxIdleTTL != 0 {
+ for _, w := range wch.sw.find(
+ roadrunner.StateReady,
+ now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)),
+ ) {
+ err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL)
+ if p.Remove(w, err) {
+ wch.report(EventMaxIdleTTL, w, err)
+ }
+ }
+ }
+}
+
+func (wch *watcher) loadWorkers(p roadrunner.Pool) {
now := time.Now()
for _, w := range p.Workers() {
@@ -86,37 +121,6 @@ func (wch *watcher) watch(p roadrunner.Pool) {
}
wch.sw.sync(now)
-
- if wch.cfg.MaxExecTTL != 0 {
- for _, w := range wch.sw.find(
- roadrunner.StateWorking,
- now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)),
- ) {
- eID := w.State().NumExecs()
- err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL)
-
- if p.Remove(w, err) {
- // make sure worker still on initial request
- if w.State().NumExecs() == eID {
- go w.Kill()
- wch.report(EventMaxExecTTL, w, err)
- }
- }
- }
- }
-
- // locale workers which are in idle mode for too long
- if wch.cfg.MaxIdleTTL != 0 {
- for _, w := range wch.sw.find(
- roadrunner.StateReady,
- now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)),
- ) {
- err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL)
- if p.Remove(w, err) {
- wch.report(EventMaxIdleTTL, w, err)
- }
- }
- }
}
// throw watcher event
diff --git a/watcher_test.go b/watcher_test.go
index 94ac591e..ac0523ac 100644
--- a/watcher_test.go
+++ b/watcher_test.go
@@ -211,6 +211,6 @@ func Test_RemoveWorkerAfterTask(t *testing.T) {
// must be replaced
assert.NotEqual(t, lastPid, fmt.Sprintf("%v", rr.Workers()[0]))
- // must not be registered withing pool
+ // must not be registered within the pool
rr.pWatcher.(*eWatcher).remove(wr, nil)
}