diff options
author | Wolfy-J <[email protected]> | 2019-05-04 19:24:25 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2019-05-04 19:24:25 +0300 |
commit | 2efc533f2aac215d487a80020b0f9bf4ae5209c3 (patch) | |
tree | a80a7a74dc4ca8c290b8b1bf1f6d24535b5ae3d7 | |
parent | 726b31008e73ab83d0582305c28a8cf62322e47a (diff) |
watchers renamed to controllers
-rw-r--r-- | CHANGELOG.md | 2 | ||||
-rw-r--r-- | cmd/util/cprint.go | 4 | ||||
-rw-r--r-- | controller.go | 10 | ||||
-rw-r--r-- | controller_test.go (renamed from watcher_test.go) | 28 | ||||
-rw-r--r-- | pool.go | 2 | ||||
-rw-r--r-- | server.go | 42 | ||||
-rw-r--r-- | service/container.go | 2 | ||||
-rw-r--r-- | service/http/service.go | 4 | ||||
-rw-r--r-- | service/watcher/config.go | 14 | ||||
-rw-r--r-- | service/watcher/controller.go (renamed from service/watcher/watcher.go) | 90 | ||||
-rw-r--r-- | service/watcher/service.go | 18 | ||||
-rw-r--r-- | service/watcher/state_filter.go (renamed from service/watcher/state_watch.go) | 12 | ||||
-rw-r--r-- | static_pool.go | 2 | ||||
-rw-r--r-- | watcher.go | 10 |
14 files changed, 121 insertions, 119 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b6f380b..59091bc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ v1.4.0 - the ability to safely remove the worker from the pool in runtime - minor performance improvements - `real ip` resolution using X-Real-Ip and X-Forwarded-For (+cidr verification) -- automatic worker lifecycle manager (watcher) +- automatic worker lifecycle manager (controller) - maxMemory (graceful stop) - maxTTL (graceful stop) - maxIdleTTL (graceful stop) diff --git a/cmd/util/cprint.go b/cmd/util/cprint.go index 5bf8a8ca..9990a038 100644 --- a/cmd/util/cprint.go +++ b/cmd/util/cprint.go @@ -8,7 +8,9 @@ import ( ) var ( - reg *regexp.Regexp + reg *regexp.Regexp + + // Colorize enables colors support. Colorize bool ) diff --git a/controller.go b/controller.go new file mode 100644 index 00000000..bda7ad6b --- /dev/null +++ b/controller.go @@ -0,0 +1,10 @@ +package roadrunner + +// Controller observes pool state and decides if any worker must be destroyed. +type Controller interface { + // Lock controller on given pool instance. + Attach(p Pool) Controller + + // Detach pool watching. + Detach() +} diff --git a/watcher_test.go b/controller_test.go index ac0523ac..031c2f31 100644 --- a/watcher_test.go +++ b/controller_test.go @@ -14,7 +14,7 @@ type eWatcher struct { onDetach func(p Pool) } -func (w *eWatcher) Attach(p Pool) Watcher { +func (w *eWatcher) Attach(p Pool) Controller { wp := &eWatcher{p: p, onAttach: w.onAttach, onDetach: w.onDetach} if wp.onAttach != nil { @@ -50,8 +50,8 @@ func Test_WatcherWatch(t *testing.T) { rr.Watch(&eWatcher{}) assert.NoError(t, rr.Start()) - assert.NotNil(t, rr.pWatcher) - assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool) + assert.NotNil(t, rr.pController) + assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool) res, err := rr.Exec(&Payload{Body: []byte("hello")}) @@ -79,16 +79,16 @@ func Test_WatcherReattach(t *testing.T) { rr.Watch(&eWatcher{}) assert.NoError(t, rr.Start()) - assert.NotNil(t, rr.pWatcher) - assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool) + assert.NotNil(t, rr.pController) + assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool) - oldWatcher := rr.pWatcher + oldWatcher := rr.pController assert.NoError(t, rr.Reset()) - assert.NotNil(t, rr.pWatcher) - assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool) - assert.NotEqual(t, oldWatcher, rr.pWatcher) + assert.NotNil(t, rr.pController) + assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool) + assert.NotEqual(t, oldWatcher, rr.pController) res, err := rr.Exec(&Payload{Body: []byte("hello")}) @@ -125,8 +125,8 @@ func Test_WatcherAttachDetachSequence(t *testing.T) { }) assert.NoError(t, rr.Start()) - assert.NotNil(t, rr.pWatcher) - assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool) + assert.NotNil(t, rr.pController) + assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool) res, err := rr.Exec(&Payload{Body: []byte("hello")}) @@ -161,7 +161,7 @@ func Test_RemoveWorkerOnAllocation(t *testing.T) { assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String()) lastPid := res.String() - rr.pWatcher.(*eWatcher).remove(wr, nil) + rr.pController.(*eWatcher).remove(wr, nil) res, err = rr.Exec(&Payload{Body: []byte("hello")}) assert.NoError(t, err) @@ -204,7 +204,7 @@ func Test_RemoveWorkerAfterTask(t *testing.T) { // wait for worker execution to be in progress time.Sleep(time.Millisecond * 250) - rr.pWatcher.(*eWatcher).remove(wr, nil) + rr.pController.(*eWatcher).remove(wr, nil) <-wait @@ -212,5 +212,5 @@ func Test_RemoveWorkerAfterTask(t *testing.T) { assert.NotEqual(t, lastPid, fmt.Sprintf("%v", rr.Workers()[0])) // must not be registered within the pool - rr.pWatcher.(*eWatcher).remove(wr, nil) + rr.pController.(*eWatcher).remove(wr, nil) } @@ -22,7 +22,7 @@ const ( // Pool managed set of inner worker processes. type Pool interface { - // Listen all caused events to attached watcher. + // Listen all caused events to attached controller. Listen(l func(event int, ctx interface{})) // Exec one task with given payload and context, returns result or error. @@ -37,13 +37,13 @@ type Server struct { // creates and connects to workers factory Factory - // associated pool watcher - watcher Watcher + // associated pool controller + controller Controller // currently active pool instance - mup sync.Mutex - pool Pool - pWatcher Watcher + mup sync.Mutex + pool Pool + pController Controller // observes pool events (can be attached to multiple pools at the same time) mul sync.Mutex @@ -55,7 +55,7 @@ func NewServer(cfg *ServerConfig) *Server { return &Server{cfg: cfg} } -// Listen attaches server event watcher. +// Listen attaches server event controller. func (s *Server) Listen(l func(event int, ctx interface{})) { s.mul.Lock() defer s.mul.Unlock() @@ -63,17 +63,17 @@ func (s *Server) Listen(l func(event int, ctx interface{})) { s.lsn = l } -// Listen attaches server event watcher. -func (s *Server) Watch(w Watcher) { +// Watch attaches worker controller. +func (s *Server) Watch(c Controller) { s.mu.Lock() defer s.mu.Unlock() - s.watcher = w + s.controller = c s.mul.Lock() - if s.pWatcher != nil && s.pool != nil { - s.pWatcher.Detach() - s.pWatcher = s.watcher.Attach(s.pool) + if s.pController != nil && s.pool != nil { + s.pController.Detach() + s.pController = s.controller.Attach(s.pool) } s.mul.Unlock() } @@ -91,8 +91,8 @@ func (s *Server) Start() (err error) { return err } - if s.watcher != nil { - s.pWatcher = s.watcher.Attach(s.pool) + if s.controller != nil { + s.pController = s.controller.Attach(s.pool) } s.pool.Listen(s.poolListener) @@ -113,9 +113,9 @@ func (s *Server) Stop() { s.throw(EventPoolDestruct, s.pool) - if s.pWatcher != nil { - s.pWatcher.Detach() - s.pWatcher = nil + if s.pController != nil { + s.pController.Detach() + s.pController = nil } s.pool.Destroy() @@ -157,7 +157,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.mu.Lock() previous := s.pool - pWatcher := s.pWatcher + pWatcher := s.pController s.mu.Unlock() pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) @@ -170,8 +170,8 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.mu.Lock() s.cfg.Pool, s.pool = cfg.Pool, pool - if s.watcher != nil { - s.pWatcher = s.watcher.Attach(pool) + if s.controller != nil { + s.pController = s.controller.Attach(pool) } s.mu.Unlock() @@ -179,7 +179,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.throw(EventPoolConstruct, pool) if previous != nil { - go func(previous Pool, pWatcher Watcher) { + go func(previous Pool, pWatcher Controller) { s.throw(EventPoolDestruct, previous) if pWatcher != nil { pWatcher.Detach() diff --git a/service/container.go b/service/container.go index abeaf369..ea1819d8 100644 --- a/service/container.go +++ b/service/container.go @@ -16,7 +16,7 @@ var errNoConfig = fmt.Errorf("no config has been provided") // implement service.HydrateConfig. const InitMethod = "Init" -// Services can serve. Services can provide Init method which must return (bool, error) signature and might accept +// Service can serve. Services can provide Init method which must return (bool, error) signature and might accept // other services and/or configs as dependency. type Service interface { // Serve serves. diff --git a/service/http/service.go b/service/http/service.go index 800d3ca9..b76d8893 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -33,14 +33,14 @@ type Service struct { mdwr []middleware mu sync.Mutex rr *roadrunner.Server - watcher roadrunner.Watcher + watcher roadrunner.Controller handler *Handler http *http.Server https *http.Server } // Watch attaches watcher. -func (s *Service) Watch(w roadrunner.Watcher) { +func (s *Service) Watch(w roadrunner.Controller) { s.watcher = w } diff --git a/service/watcher/config.go b/service/watcher/config.go index 74be517a..8151005d 100644 --- a/service/watcher/config.go +++ b/service/watcher/config.go @@ -8,11 +8,11 @@ import ( // Configures set of Services. type Config struct { - // Interval defines the update duration for underlying watchers, default 1s. + // Interval defines the update duration for underlying controllers, default 1s. Interval time.Duration // Services declares list of services to be watched. - Services map[string]*watcherConfig + Services map[string]*controllerConfig } // Hydrate must populate Config values using given Config source. Must return error if Config is not valid. @@ -36,13 +36,13 @@ func (c *Config) InitDefaults() error { return nil } -// Watchers returns list of defined Services -func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) { - watchers = make(map[string]roadrunner.Watcher) +// Controllers returns list of defined Services +func (c *Config) Controllers(l listener) (controllers map[string]roadrunner.Controller) { + controllers = make(map[string]roadrunner.Controller) for name, cfg := range c.Services { - watchers[name] = &watcher{lsn: l, tick: c.Interval, cfg: cfg} + controllers[name] = &controller{lsn: l, tick: c.Interval, cfg: cfg} } - return watchers + return controllers } diff --git a/service/watcher/watcher.go b/service/watcher/controller.go index 65a2eeeb..38eddf84 100644 --- a/service/watcher/watcher.go +++ b/service/watcher/controller.go @@ -21,11 +21,11 @@ const ( EventMaxExecTTL ) -// handles watcher events +// handles controller events type listener func(event int, ctx interface{}) -// defines the watcher behaviour -type watcherConfig struct { +// defines the controller behaviour +type controllerConfig struct { // MaxMemory defines maximum amount of memory allowed for worker. In megabytes. MaxMemory uint64 @@ -39,54 +39,54 @@ type watcherConfig struct { MaxExecTTL int64 } -type watcher struct { +type controller struct { lsn listener tick time.Duration - cfg *watcherConfig + cfg *controllerConfig // list of workers which are currently working - sw *stateWatcher + sw *stateFilter stop chan interface{} } -// watch the pool state -func (wch *watcher) watch(p roadrunner.Pool) { - wch.loadWorkers(p) +// control the pool state +func (c *controller) control(p roadrunner.Pool) { + c.loadWorkers(p) now := time.Now() - if wch.cfg.MaxExecTTL != 0 { - for _, w := range wch.sw.find( + if c.cfg.MaxExecTTL != 0 { + for _, w := range c.sw.find( roadrunner.StateWorking, - now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)), + now.Add(-time.Second*time.Duration(c.cfg.MaxExecTTL)), ) { eID := w.State().NumExecs() - err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL) + err := fmt.Errorf("max exec time reached (%vs)", c.cfg.MaxExecTTL) // make sure worker still on initial request if p.Remove(w, err) && w.State().NumExecs() == eID { go w.Kill() - wch.report(EventMaxExecTTL, w, err) + c.report(EventMaxExecTTL, w, err) } } } // locale workers which are in idle mode for too long - if wch.cfg.MaxIdleTTL != 0 { - for _, w := range wch.sw.find( + if c.cfg.MaxIdleTTL != 0 { + for _, w := range c.sw.find( roadrunner.StateReady, - now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)), + now.Add(-time.Second*time.Duration(c.cfg.MaxIdleTTL)), ) { - err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL) + err := fmt.Errorf("max idle time reached (%vs)", c.cfg.MaxIdleTTL) if p.Remove(w, err) { - wch.report(EventMaxIdleTTL, w, err) + c.report(EventMaxIdleTTL, w, err) } } } } -func (wch *watcher) loadWorkers(p roadrunner.Pool) { +func (c *controller) loadWorkers(p roadrunner.Pool) { now := time.Now() for _, w := range p.Workers() { @@ -100,52 +100,52 @@ func (wch *watcher) loadWorkers(p roadrunner.Pool) { continue } - if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) { - err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL) + if c.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(c.cfg.TTL) { + err := fmt.Errorf("max TTL reached (%vs)", c.cfg.TTL) if p.Remove(w, err) { - wch.report(EventMaxTTL, w, err) + c.report(EventMaxTTL, w, err) } continue } - if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 { - err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory) + if c.cfg.MaxMemory != 0 && s.MemoryUsage >= c.cfg.MaxMemory*1024*1024 { + err := fmt.Errorf("max allowed memory reached (%vMB)", c.cfg.MaxMemory) if p.Remove(w, err) { - wch.report(EventMaxMemory, w, err) + c.report(EventMaxMemory, w, err) } continue } - // watch the worker state changes - wch.sw.push(w) + // control the worker state changes + c.sw.push(w) } - wch.sw.sync(now) + c.sw.sync(now) } -// throw watcher event -func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) { - if wch.lsn != nil { - wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) +// throw controller event +func (c *controller) report(event int, worker *roadrunner.Worker, caused error) { + if c.lsn != nil { + c.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) } } -// Attach watcher to the pool -func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher { - wp := &watcher{ - tick: wch.tick, - lsn: wch.lsn, - cfg: wch.cfg, - sw: newStateWatcher(), +// Attach controller to the pool +func (c *controller) Attach(pool roadrunner.Pool) roadrunner.Controller { + wp := &controller{ + tick: c.tick, + lsn: c.lsn, + cfg: c.cfg, + sw: newStateFilter(), stop: make(chan interface{}), } - go func(wp *watcher, pool roadrunner.Pool) { + go func(wp *controller, pool roadrunner.Pool) { ticker := time.NewTicker(wp.tick) for { select { case <-ticker.C: - wp.watch(pool) + wp.control(pool) case <-wp.stop: return } @@ -155,7 +155,7 @@ func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher { return wp } -// Detach watcher from the pool. -func (wch *watcher) Detach() { - close(wch.stop) +// Detach controller from the pool. +func (c *controller) Detach() { + close(c.stop) } diff --git a/service/watcher/service.go b/service/watcher/service.go index 0d419716..3db23b68 100644 --- a/service/watcher/service.go +++ b/service/watcher/service.go @@ -5,25 +5,25 @@ import ( "github.com/spiral/roadrunner/service" ) -// ID defines watcher service name. -const ID = "watch" +// ID defines controller service name. +const ID = "control" -// Watchable defines the ability to attach rr watcher. +// Watchable defines the ability to attach rr controller. type Watchable interface { - // Watch attaches watcher to the service. - Watch(w roadrunner.Watcher) + // Watch attaches controller to the service. + Watch(w roadrunner.Controller) } -// Services to watch the state of rr service inside other services. +// Services to control the state of rr service inside other services. type Service struct { cfg *Config lsns []func(event int, ctx interface{}) } -// Init watcher service +// Init controller service func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { // mount Services to designated services - for id, watcher := range cfg.Watchers(s.throw) { + for id, watcher := range cfg.Controllers(s.throw) { svc, _ := c.Get(id) if watchable, ok := svc.(Watchable); ok { watchable.Watch(watcher) @@ -33,7 +33,7 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { return true, nil } -// AddListener attaches server event watcher. +// AddListener attaches server event controller. func (s *Service) AddListener(l func(event int, ctx interface{})) { s.lsns = append(s.lsns, l) } diff --git a/service/watcher/state_watch.go b/service/watcher/state_filter.go index 3090d15d..d85f1308 100644 --- a/service/watcher/state_watch.go +++ b/service/watcher/state_filter.go @@ -5,7 +5,7 @@ import ( "time" ) -type stateWatcher struct { +type stateFilter struct { prev map[*roadrunner.Worker]state next map[*roadrunner.Worker]state } @@ -16,20 +16,20 @@ type state struct { since time.Time } -func newStateWatcher() *stateWatcher { - return &stateWatcher{ +func newStateFilter() *stateFilter { + return &stateFilter{ prev: make(map[*roadrunner.Worker]state), next: make(map[*roadrunner.Worker]state), } } // add new worker to be watched -func (sw *stateWatcher) push(w *roadrunner.Worker) { +func (sw *stateFilter) push(w *roadrunner.Worker) { sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()} } // update worker states. -func (sw *stateWatcher) sync(t time.Time) { +func (sw *stateFilter) sync(t time.Time) { for w := range sw.prev { if _, ok := sw.next[w]; !ok { delete(sw.prev, w) @@ -47,7 +47,7 @@ func (sw *stateWatcher) sync(t time.Time) { } // find all workers which spend given amount of time in a specific state. -func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) { +func (sw *stateFilter) find(state int64, since time.Time) (workers []*roadrunner.Worker) { for w, s := range sw.prev { if s.state == state && s.since.Before(since) { workers = append(workers, w) diff --git a/static_pool.go b/static_pool.go index 02960825..66b1366e 100644 --- a/static_pool.go +++ b/static_pool.go @@ -84,7 +84,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er return p, nil } -// Listen attaches pool event watcher. +// Listen attaches pool event controller. func (p *StaticPool) Listen(l func(event int, ctx interface{})) { p.mul.Lock() defer p.mul.Unlock() diff --git a/watcher.go b/watcher.go deleted file mode 100644 index 4527fe68..00000000 --- a/watcher.go +++ /dev/null @@ -1,10 +0,0 @@ -package roadrunner - -// Watcher observes pool state and decides if any worker must be destroyed. -type Watcher interface { - // Lock watcher on given pool instance. - Attach(p Pool) Watcher - - // Detach pool watching. - Detach() -} |