From 2efc533f2aac215d487a80020b0f9bf4ae5209c3 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sat, 4 May 2019 19:24:25 +0300 Subject: watchers renamed to controllers --- service/container.go | 2 +- service/http/service.go | 4 +- service/watcher/config.go | 14 ++-- service/watcher/controller.go | 161 ++++++++++++++++++++++++++++++++++++++++ service/watcher/service.go | 18 ++--- service/watcher/state_filter.go | 58 +++++++++++++++ service/watcher/state_watch.go | 58 --------------- service/watcher/watcher.go | 161 ---------------------------------------- 8 files changed, 238 insertions(+), 238 deletions(-) create mode 100644 service/watcher/controller.go create mode 100644 service/watcher/state_filter.go delete mode 100644 service/watcher/state_watch.go delete mode 100644 service/watcher/watcher.go (limited to 'service') diff --git a/service/container.go b/service/container.go index abeaf369..ea1819d8 100644 --- a/service/container.go +++ b/service/container.go @@ -16,7 +16,7 @@ var errNoConfig = fmt.Errorf("no config has been provided") // implement service.HydrateConfig. const InitMethod = "Init" -// Services can serve. Services can provide Init method which must return (bool, error) signature and might accept +// Service can serve. Services can provide Init method which must return (bool, error) signature and might accept // other services and/or configs as dependency. type Service interface { // Serve serves. diff --git a/service/http/service.go b/service/http/service.go index 800d3ca9..b76d8893 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -33,14 +33,14 @@ type Service struct { mdwr []middleware mu sync.Mutex rr *roadrunner.Server - watcher roadrunner.Watcher + watcher roadrunner.Controller handler *Handler http *http.Server https *http.Server } // Watch attaches watcher. -func (s *Service) Watch(w roadrunner.Watcher) { +func (s *Service) Watch(w roadrunner.Controller) { s.watcher = w } diff --git a/service/watcher/config.go b/service/watcher/config.go index 74be517a..8151005d 100644 --- a/service/watcher/config.go +++ b/service/watcher/config.go @@ -8,11 +8,11 @@ import ( // Configures set of Services. type Config struct { - // Interval defines the update duration for underlying watchers, default 1s. + // Interval defines the update duration for underlying controllers, default 1s. Interval time.Duration // Services declares list of services to be watched. - Services map[string]*watcherConfig + Services map[string]*controllerConfig } // Hydrate must populate Config values using given Config source. Must return error if Config is not valid. @@ -36,13 +36,13 @@ func (c *Config) InitDefaults() error { return nil } -// Watchers returns list of defined Services -func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) { - watchers = make(map[string]roadrunner.Watcher) +// Controllers returns list of defined Services +func (c *Config) Controllers(l listener) (controllers map[string]roadrunner.Controller) { + controllers = make(map[string]roadrunner.Controller) for name, cfg := range c.Services { - watchers[name] = &watcher{lsn: l, tick: c.Interval, cfg: cfg} + controllers[name] = &controller{lsn: l, tick: c.Interval, cfg: cfg} } - return watchers + return controllers } diff --git a/service/watcher/controller.go b/service/watcher/controller.go new file mode 100644 index 00000000..38eddf84 --- /dev/null +++ b/service/watcher/controller.go @@ -0,0 +1,161 @@ +package watcher + +import ( + "fmt" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/util" + "time" +) + +const ( + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory = iota + 8000 + + // EventMaxTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError + EventMaxTTL + + // EventMaxIdleTTL triggered when worker spends too much time at rest. + EventMaxIdleTTL + + // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time). + EventMaxExecTTL +) + +// handles controller events +type listener func(event int, ctx interface{}) + +// defines the controller behaviour +type controllerConfig struct { + // MaxMemory defines maximum amount of memory allowed for worker. In megabytes. + MaxMemory uint64 + + // TTL defines maximum time worker is allowed to live. + TTL int64 + + // MaxIdleTTL defines maximum duration worker can spend in idle mode. + MaxIdleTTL int64 + + // MaxExecTTL defines maximum lifetime per job. + MaxExecTTL int64 +} + +type controller struct { + lsn listener + tick time.Duration + cfg *controllerConfig + + // list of workers which are currently working + sw *stateFilter + + stop chan interface{} +} + +// control the pool state +func (c *controller) control(p roadrunner.Pool) { + c.loadWorkers(p) + + now := time.Now() + + if c.cfg.MaxExecTTL != 0 { + for _, w := range c.sw.find( + roadrunner.StateWorking, + now.Add(-time.Second*time.Duration(c.cfg.MaxExecTTL)), + ) { + eID := w.State().NumExecs() + err := fmt.Errorf("max exec time reached (%vs)", c.cfg.MaxExecTTL) + + // make sure worker still on initial request + if p.Remove(w, err) && w.State().NumExecs() == eID { + go w.Kill() + c.report(EventMaxExecTTL, w, err) + } + } + } + + // locale workers which are in idle mode for too long + if c.cfg.MaxIdleTTL != 0 { + for _, w := range c.sw.find( + roadrunner.StateReady, + now.Add(-time.Second*time.Duration(c.cfg.MaxIdleTTL)), + ) { + err := fmt.Errorf("max idle time reached (%vs)", c.cfg.MaxIdleTTL) + if p.Remove(w, err) { + c.report(EventMaxIdleTTL, w, err) + } + } + } +} + +func (c *controller) loadWorkers(p roadrunner.Pool) { + now := time.Now() + + for _, w := range p.Workers() { + if w.State().Value() == roadrunner.StateInvalid { + // skip duplicate assessment + continue + } + + s, err := util.WorkerState(w) + if err != nil { + continue + } + + if c.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(c.cfg.TTL) { + err := fmt.Errorf("max TTL reached (%vs)", c.cfg.TTL) + if p.Remove(w, err) { + c.report(EventMaxTTL, w, err) + } + continue + } + + if c.cfg.MaxMemory != 0 && s.MemoryUsage >= c.cfg.MaxMemory*1024*1024 { + err := fmt.Errorf("max allowed memory reached (%vMB)", c.cfg.MaxMemory) + if p.Remove(w, err) { + c.report(EventMaxMemory, w, err) + } + continue + } + + // control the worker state changes + c.sw.push(w) + } + + c.sw.sync(now) +} + +// throw controller event +func (c *controller) report(event int, worker *roadrunner.Worker, caused error) { + if c.lsn != nil { + c.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) + } +} + +// Attach controller to the pool +func (c *controller) Attach(pool roadrunner.Pool) roadrunner.Controller { + wp := &controller{ + tick: c.tick, + lsn: c.lsn, + cfg: c.cfg, + sw: newStateFilter(), + stop: make(chan interface{}), + } + + go func(wp *controller, pool roadrunner.Pool) { + ticker := time.NewTicker(wp.tick) + for { + select { + case <-ticker.C: + wp.control(pool) + case <-wp.stop: + return + } + } + }(wp, pool) + + return wp +} + +// Detach controller from the pool. +func (c *controller) Detach() { + close(c.stop) +} diff --git a/service/watcher/service.go b/service/watcher/service.go index 0d419716..3db23b68 100644 --- a/service/watcher/service.go +++ b/service/watcher/service.go @@ -5,25 +5,25 @@ import ( "github.com/spiral/roadrunner/service" ) -// ID defines watcher service name. -const ID = "watch" +// ID defines controller service name. +const ID = "control" -// Watchable defines the ability to attach rr watcher. +// Watchable defines the ability to attach rr controller. type Watchable interface { - // Watch attaches watcher to the service. - Watch(w roadrunner.Watcher) + // Watch attaches controller to the service. + Watch(w roadrunner.Controller) } -// Services to watch the state of rr service inside other services. +// Services to control the state of rr service inside other services. type Service struct { cfg *Config lsns []func(event int, ctx interface{}) } -// Init watcher service +// Init controller service func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { // mount Services to designated services - for id, watcher := range cfg.Watchers(s.throw) { + for id, watcher := range cfg.Controllers(s.throw) { svc, _ := c.Get(id) if watchable, ok := svc.(Watchable); ok { watchable.Watch(watcher) @@ -33,7 +33,7 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { return true, nil } -// AddListener attaches server event watcher. +// AddListener attaches server event controller. func (s *Service) AddListener(l func(event int, ctx interface{})) { s.lsns = append(s.lsns, l) } diff --git a/service/watcher/state_filter.go b/service/watcher/state_filter.go new file mode 100644 index 00000000..d85f1308 --- /dev/null +++ b/service/watcher/state_filter.go @@ -0,0 +1,58 @@ +package watcher + +import ( + "github.com/spiral/roadrunner" + "time" +) + +type stateFilter struct { + prev map[*roadrunner.Worker]state + next map[*roadrunner.Worker]state +} + +type state struct { + state int64 + numExecs int64 + since time.Time +} + +func newStateFilter() *stateFilter { + return &stateFilter{ + prev: make(map[*roadrunner.Worker]state), + next: make(map[*roadrunner.Worker]state), + } +} + +// add new worker to be watched +func (sw *stateFilter) push(w *roadrunner.Worker) { + sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()} +} + +// update worker states. +func (sw *stateFilter) sync(t time.Time) { + for w := range sw.prev { + if _, ok := sw.next[w]; !ok { + delete(sw.prev, w) + } + } + + for w, s := range sw.next { + ps, ok := sw.prev[w] + if !ok || ps.state != s.state || ps.numExecs != s.numExecs { + sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t} + } + + delete(sw.next, w) + } +} + +// find all workers which spend given amount of time in a specific state. +func (sw *stateFilter) find(state int64, since time.Time) (workers []*roadrunner.Worker) { + for w, s := range sw.prev { + if s.state == state && s.since.Before(since) { + workers = append(workers, w) + } + } + + return +} diff --git a/service/watcher/state_watch.go b/service/watcher/state_watch.go deleted file mode 100644 index 3090d15d..00000000 --- a/service/watcher/state_watch.go +++ /dev/null @@ -1,58 +0,0 @@ -package watcher - -import ( - "github.com/spiral/roadrunner" - "time" -) - -type stateWatcher struct { - prev map[*roadrunner.Worker]state - next map[*roadrunner.Worker]state -} - -type state struct { - state int64 - numExecs int64 - since time.Time -} - -func newStateWatcher() *stateWatcher { - return &stateWatcher{ - prev: make(map[*roadrunner.Worker]state), - next: make(map[*roadrunner.Worker]state), - } -} - -// add new worker to be watched -func (sw *stateWatcher) push(w *roadrunner.Worker) { - sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()} -} - -// update worker states. -func (sw *stateWatcher) sync(t time.Time) { - for w := range sw.prev { - if _, ok := sw.next[w]; !ok { - delete(sw.prev, w) - } - } - - for w, s := range sw.next { - ps, ok := sw.prev[w] - if !ok || ps.state != s.state || ps.numExecs != s.numExecs { - sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t} - } - - delete(sw.next, w) - } -} - -// find all workers which spend given amount of time in a specific state. -func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) { - for w, s := range sw.prev { - if s.state == state && s.since.Before(since) { - workers = append(workers, w) - } - } - - return -} diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go deleted file mode 100644 index 65a2eeeb..00000000 --- a/service/watcher/watcher.go +++ /dev/null @@ -1,161 +0,0 @@ -package watcher - -import ( - "fmt" - "github.com/spiral/roadrunner" - "github.com/spiral/roadrunner/util" - "time" -) - -const ( - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory = iota + 8000 - - // EventMaxTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError - EventMaxTTL - - // EventMaxIdleTTL triggered when worker spends too much time at rest. - EventMaxIdleTTL - - // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time). - EventMaxExecTTL -) - -// handles watcher events -type listener func(event int, ctx interface{}) - -// defines the watcher behaviour -type watcherConfig struct { - // MaxMemory defines maximum amount of memory allowed for worker. In megabytes. - MaxMemory uint64 - - // TTL defines maximum time worker is allowed to live. - TTL int64 - - // MaxIdleTTL defines maximum duration worker can spend in idle mode. - MaxIdleTTL int64 - - // MaxExecTTL defines maximum lifetime per job. - MaxExecTTL int64 -} - -type watcher struct { - lsn listener - tick time.Duration - cfg *watcherConfig - - // list of workers which are currently working - sw *stateWatcher - - stop chan interface{} -} - -// watch the pool state -func (wch *watcher) watch(p roadrunner.Pool) { - wch.loadWorkers(p) - - now := time.Now() - - if wch.cfg.MaxExecTTL != 0 { - for _, w := range wch.sw.find( - roadrunner.StateWorking, - now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)), - ) { - eID := w.State().NumExecs() - err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL) - - // make sure worker still on initial request - if p.Remove(w, err) && w.State().NumExecs() == eID { - go w.Kill() - wch.report(EventMaxExecTTL, w, err) - } - } - } - - // locale workers which are in idle mode for too long - if wch.cfg.MaxIdleTTL != 0 { - for _, w := range wch.sw.find( - roadrunner.StateReady, - now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)), - ) { - err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL) - if p.Remove(w, err) { - wch.report(EventMaxIdleTTL, w, err) - } - } - } -} - -func (wch *watcher) loadWorkers(p roadrunner.Pool) { - now := time.Now() - - for _, w := range p.Workers() { - if w.State().Value() == roadrunner.StateInvalid { - // skip duplicate assessment - continue - } - - s, err := util.WorkerState(w) - if err != nil { - continue - } - - if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) { - err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL) - if p.Remove(w, err) { - wch.report(EventMaxTTL, w, err) - } - continue - } - - if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 { - err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory) - if p.Remove(w, err) { - wch.report(EventMaxMemory, w, err) - } - continue - } - - // watch the worker state changes - wch.sw.push(w) - } - - wch.sw.sync(now) -} - -// throw watcher event -func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) { - if wch.lsn != nil { - wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) - } -} - -// Attach watcher to the pool -func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher { - wp := &watcher{ - tick: wch.tick, - lsn: wch.lsn, - cfg: wch.cfg, - sw: newStateWatcher(), - stop: make(chan interface{}), - } - - go func(wp *watcher, pool roadrunner.Pool) { - ticker := time.NewTicker(wp.tick) - for { - select { - case <-ticker.C: - wp.watch(pool) - case <-wp.stop: - return - } - } - }(wp, pool) - - return wp -} - -// Detach watcher from the pool. -func (wch *watcher) Detach() { - close(wch.stop) -} -- cgit v1.2.3