summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-04 19:24:25 +0300
committerWolfy-J <[email protected]>2019-05-04 19:24:25 +0300
commit2efc533f2aac215d487a80020b0f9bf4ae5209c3 (patch)
treea80a7a74dc4ca8c290b8b1bf1f6d24535b5ae3d7
parent726b31008e73ab83d0582305c28a8cf62322e47a (diff)
watchers renamed to controllers
-rw-r--r--CHANGELOG.md2
-rw-r--r--cmd/util/cprint.go4
-rw-r--r--controller.go10
-rw-r--r--controller_test.go (renamed from watcher_test.go)28
-rw-r--r--pool.go2
-rw-r--r--server.go42
-rw-r--r--service/container.go2
-rw-r--r--service/http/service.go4
-rw-r--r--service/watcher/config.go14
-rw-r--r--service/watcher/controller.go (renamed from service/watcher/watcher.go)90
-rw-r--r--service/watcher/service.go18
-rw-r--r--service/watcher/state_filter.go (renamed from service/watcher/state_watch.go)12
-rw-r--r--static_pool.go2
-rw-r--r--watcher.go10
14 files changed, 121 insertions, 119 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9b6f380b..59091bc8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,7 +7,7 @@ v1.4.0
- the ability to safely remove the worker from the pool in runtime
- minor performance improvements
- `real ip` resolution using X-Real-Ip and X-Forwarded-For (+cidr verification)
-- automatic worker lifecycle manager (watcher)
+- automatic worker lifecycle manager (controller)
- maxMemory (graceful stop)
- maxTTL (graceful stop)
- maxIdleTTL (graceful stop)
diff --git a/cmd/util/cprint.go b/cmd/util/cprint.go
index 5bf8a8ca..9990a038 100644
--- a/cmd/util/cprint.go
+++ b/cmd/util/cprint.go
@@ -8,7 +8,9 @@ import (
)
var (
- reg *regexp.Regexp
+ reg *regexp.Regexp
+
+ // Colorize enables colors support.
Colorize bool
)
diff --git a/controller.go b/controller.go
new file mode 100644
index 00000000..bda7ad6b
--- /dev/null
+++ b/controller.go
@@ -0,0 +1,10 @@
+package roadrunner
+
+// Controller observes pool state and decides if any worker must be destroyed.
+type Controller interface {
+ // Lock controller on given pool instance.
+ Attach(p Pool) Controller
+
+ // Detach pool watching.
+ Detach()
+}
diff --git a/watcher_test.go b/controller_test.go
index ac0523ac..031c2f31 100644
--- a/watcher_test.go
+++ b/controller_test.go
@@ -14,7 +14,7 @@ type eWatcher struct {
onDetach func(p Pool)
}
-func (w *eWatcher) Attach(p Pool) Watcher {
+func (w *eWatcher) Attach(p Pool) Controller {
wp := &eWatcher{p: p, onAttach: w.onAttach, onDetach: w.onDetach}
if wp.onAttach != nil {
@@ -50,8 +50,8 @@ func Test_WatcherWatch(t *testing.T) {
rr.Watch(&eWatcher{})
assert.NoError(t, rr.Start())
- assert.NotNil(t, rr.pWatcher)
- assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool)
+ assert.NotNil(t, rr.pController)
+ assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool)
res, err := rr.Exec(&Payload{Body: []byte("hello")})
@@ -79,16 +79,16 @@ func Test_WatcherReattach(t *testing.T) {
rr.Watch(&eWatcher{})
assert.NoError(t, rr.Start())
- assert.NotNil(t, rr.pWatcher)
- assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool)
+ assert.NotNil(t, rr.pController)
+ assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool)
- oldWatcher := rr.pWatcher
+ oldWatcher := rr.pController
assert.NoError(t, rr.Reset())
- assert.NotNil(t, rr.pWatcher)
- assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool)
- assert.NotEqual(t, oldWatcher, rr.pWatcher)
+ assert.NotNil(t, rr.pController)
+ assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool)
+ assert.NotEqual(t, oldWatcher, rr.pController)
res, err := rr.Exec(&Payload{Body: []byte("hello")})
@@ -125,8 +125,8 @@ func Test_WatcherAttachDetachSequence(t *testing.T) {
})
assert.NoError(t, rr.Start())
- assert.NotNil(t, rr.pWatcher)
- assert.Equal(t, rr.pWatcher.(*eWatcher).p, rr.pool)
+ assert.NotNil(t, rr.pController)
+ assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool)
res, err := rr.Exec(&Payload{Body: []byte("hello")})
@@ -161,7 +161,7 @@ func Test_RemoveWorkerOnAllocation(t *testing.T) {
assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String())
lastPid := res.String()
- rr.pWatcher.(*eWatcher).remove(wr, nil)
+ rr.pController.(*eWatcher).remove(wr, nil)
res, err = rr.Exec(&Payload{Body: []byte("hello")})
assert.NoError(t, err)
@@ -204,7 +204,7 @@ func Test_RemoveWorkerAfterTask(t *testing.T) {
// wait for worker execution to be in progress
time.Sleep(time.Millisecond * 250)
- rr.pWatcher.(*eWatcher).remove(wr, nil)
+ rr.pController.(*eWatcher).remove(wr, nil)
<-wait
@@ -212,5 +212,5 @@ func Test_RemoveWorkerAfterTask(t *testing.T) {
assert.NotEqual(t, lastPid, fmt.Sprintf("%v", rr.Workers()[0]))
// must not be registered within the pool
- rr.pWatcher.(*eWatcher).remove(wr, nil)
+ rr.pController.(*eWatcher).remove(wr, nil)
}
diff --git a/pool.go b/pool.go
index 23857604..d863e96f 100644
--- a/pool.go
+++ b/pool.go
@@ -22,7 +22,7 @@ const (
// Pool managed set of inner worker processes.
type Pool interface {
- // Listen all caused events to attached watcher.
+ // Listen all caused events to attached controller.
Listen(l func(event int, ctx interface{}))
// Exec one task with given payload and context, returns result or error.
diff --git a/server.go b/server.go
index 397898f2..3f1f48fc 100644
--- a/server.go
+++ b/server.go
@@ -37,13 +37,13 @@ type Server struct {
// creates and connects to workers
factory Factory
- // associated pool watcher
- watcher Watcher
+ // associated pool controller
+ controller Controller
// currently active pool instance
- mup sync.Mutex
- pool Pool
- pWatcher Watcher
+ mup sync.Mutex
+ pool Pool
+ pController Controller
// observes pool events (can be attached to multiple pools at the same time)
mul sync.Mutex
@@ -55,7 +55,7 @@ func NewServer(cfg *ServerConfig) *Server {
return &Server{cfg: cfg}
}
-// Listen attaches server event watcher.
+// Listen attaches server event controller.
func (s *Server) Listen(l func(event int, ctx interface{})) {
s.mul.Lock()
defer s.mul.Unlock()
@@ -63,17 +63,17 @@ func (s *Server) Listen(l func(event int, ctx interface{})) {
s.lsn = l
}
-// Listen attaches server event watcher.
-func (s *Server) Watch(w Watcher) {
+// Watch attaches worker controller.
+func (s *Server) Watch(c Controller) {
s.mu.Lock()
defer s.mu.Unlock()
- s.watcher = w
+ s.controller = c
s.mul.Lock()
- if s.pWatcher != nil && s.pool != nil {
- s.pWatcher.Detach()
- s.pWatcher = s.watcher.Attach(s.pool)
+ if s.pController != nil && s.pool != nil {
+ s.pController.Detach()
+ s.pController = s.controller.Attach(s.pool)
}
s.mul.Unlock()
}
@@ -91,8 +91,8 @@ func (s *Server) Start() (err error) {
return err
}
- if s.watcher != nil {
- s.pWatcher = s.watcher.Attach(s.pool)
+ if s.controller != nil {
+ s.pController = s.controller.Attach(s.pool)
}
s.pool.Listen(s.poolListener)
@@ -113,9 +113,9 @@ func (s *Server) Stop() {
s.throw(EventPoolDestruct, s.pool)
- if s.pWatcher != nil {
- s.pWatcher.Detach()
- s.pWatcher = nil
+ if s.pController != nil {
+ s.pController.Detach()
+ s.pController = nil
}
s.pool.Destroy()
@@ -157,7 +157,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error {
s.mu.Lock()
previous := s.pool
- pWatcher := s.pWatcher
+ pWatcher := s.pController
s.mu.Unlock()
pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool)
@@ -170,8 +170,8 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error {
s.mu.Lock()
s.cfg.Pool, s.pool = cfg.Pool, pool
- if s.watcher != nil {
- s.pWatcher = s.watcher.Attach(pool)
+ if s.controller != nil {
+ s.pController = s.controller.Attach(pool)
}
s.mu.Unlock()
@@ -179,7 +179,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error {
s.throw(EventPoolConstruct, pool)
if previous != nil {
- go func(previous Pool, pWatcher Watcher) {
+ go func(previous Pool, pWatcher Controller) {
s.throw(EventPoolDestruct, previous)
if pWatcher != nil {
pWatcher.Detach()
diff --git a/service/container.go b/service/container.go
index abeaf369..ea1819d8 100644
--- a/service/container.go
+++ b/service/container.go
@@ -16,7 +16,7 @@ var errNoConfig = fmt.Errorf("no config has been provided")
// implement service.HydrateConfig.
const InitMethod = "Init"
-// Services can serve. Services can provide Init method which must return (bool, error) signature and might accept
+// Service can serve. Services can provide Init method which must return (bool, error) signature and might accept
// other services and/or configs as dependency.
type Service interface {
// Serve serves.
diff --git a/service/http/service.go b/service/http/service.go
index 800d3ca9..b76d8893 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -33,14 +33,14 @@ type Service struct {
mdwr []middleware
mu sync.Mutex
rr *roadrunner.Server
- watcher roadrunner.Watcher
+ watcher roadrunner.Controller
handler *Handler
http *http.Server
https *http.Server
}
// Watch attaches watcher.
-func (s *Service) Watch(w roadrunner.Watcher) {
+func (s *Service) Watch(w roadrunner.Controller) {
s.watcher = w
}
diff --git a/service/watcher/config.go b/service/watcher/config.go
index 74be517a..8151005d 100644
--- a/service/watcher/config.go
+++ b/service/watcher/config.go
@@ -8,11 +8,11 @@ import (
// Configures set of Services.
type Config struct {
- // Interval defines the update duration for underlying watchers, default 1s.
+ // Interval defines the update duration for underlying controllers, default 1s.
Interval time.Duration
// Services declares list of services to be watched.
- Services map[string]*watcherConfig
+ Services map[string]*controllerConfig
}
// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
@@ -36,13 +36,13 @@ func (c *Config) InitDefaults() error {
return nil
}
-// Watchers returns list of defined Services
-func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) {
- watchers = make(map[string]roadrunner.Watcher)
+// Controllers returns list of defined Services
+func (c *Config) Controllers(l listener) (controllers map[string]roadrunner.Controller) {
+ controllers = make(map[string]roadrunner.Controller)
for name, cfg := range c.Services {
- watchers[name] = &watcher{lsn: l, tick: c.Interval, cfg: cfg}
+ controllers[name] = &controller{lsn: l, tick: c.Interval, cfg: cfg}
}
- return watchers
+ return controllers
}
diff --git a/service/watcher/watcher.go b/service/watcher/controller.go
index 65a2eeeb..38eddf84 100644
--- a/service/watcher/watcher.go
+++ b/service/watcher/controller.go
@@ -21,11 +21,11 @@ const (
EventMaxExecTTL
)
-// handles watcher events
+// handles controller events
type listener func(event int, ctx interface{})
-// defines the watcher behaviour
-type watcherConfig struct {
+// defines the controller behaviour
+type controllerConfig struct {
// MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
MaxMemory uint64
@@ -39,54 +39,54 @@ type watcherConfig struct {
MaxExecTTL int64
}
-type watcher struct {
+type controller struct {
lsn listener
tick time.Duration
- cfg *watcherConfig
+ cfg *controllerConfig
// list of workers which are currently working
- sw *stateWatcher
+ sw *stateFilter
stop chan interface{}
}
-// watch the pool state
-func (wch *watcher) watch(p roadrunner.Pool) {
- wch.loadWorkers(p)
+// control the pool state
+func (c *controller) control(p roadrunner.Pool) {
+ c.loadWorkers(p)
now := time.Now()
- if wch.cfg.MaxExecTTL != 0 {
- for _, w := range wch.sw.find(
+ if c.cfg.MaxExecTTL != 0 {
+ for _, w := range c.sw.find(
roadrunner.StateWorking,
- now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)),
+ now.Add(-time.Second*time.Duration(c.cfg.MaxExecTTL)),
) {
eID := w.State().NumExecs()
- err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL)
+ err := fmt.Errorf("max exec time reached (%vs)", c.cfg.MaxExecTTL)
// make sure worker still on initial request
if p.Remove(w, err) && w.State().NumExecs() == eID {
go w.Kill()
- wch.report(EventMaxExecTTL, w, err)
+ c.report(EventMaxExecTTL, w, err)
}
}
}
// locale workers which are in idle mode for too long
- if wch.cfg.MaxIdleTTL != 0 {
- for _, w := range wch.sw.find(
+ if c.cfg.MaxIdleTTL != 0 {
+ for _, w := range c.sw.find(
roadrunner.StateReady,
- now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)),
+ now.Add(-time.Second*time.Duration(c.cfg.MaxIdleTTL)),
) {
- err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL)
+ err := fmt.Errorf("max idle time reached (%vs)", c.cfg.MaxIdleTTL)
if p.Remove(w, err) {
- wch.report(EventMaxIdleTTL, w, err)
+ c.report(EventMaxIdleTTL, w, err)
}
}
}
}
-func (wch *watcher) loadWorkers(p roadrunner.Pool) {
+func (c *controller) loadWorkers(p roadrunner.Pool) {
now := time.Now()
for _, w := range p.Workers() {
@@ -100,52 +100,52 @@ func (wch *watcher) loadWorkers(p roadrunner.Pool) {
continue
}
- if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) {
- err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL)
+ if c.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(c.cfg.TTL) {
+ err := fmt.Errorf("max TTL reached (%vs)", c.cfg.TTL)
if p.Remove(w, err) {
- wch.report(EventMaxTTL, w, err)
+ c.report(EventMaxTTL, w, err)
}
continue
}
- if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 {
- err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory)
+ if c.cfg.MaxMemory != 0 && s.MemoryUsage >= c.cfg.MaxMemory*1024*1024 {
+ err := fmt.Errorf("max allowed memory reached (%vMB)", c.cfg.MaxMemory)
if p.Remove(w, err) {
- wch.report(EventMaxMemory, w, err)
+ c.report(EventMaxMemory, w, err)
}
continue
}
- // watch the worker state changes
- wch.sw.push(w)
+ // control the worker state changes
+ c.sw.push(w)
}
- wch.sw.sync(now)
+ c.sw.sync(now)
}
-// throw watcher event
-func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) {
- if wch.lsn != nil {
- wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
+// throw controller event
+func (c *controller) report(event int, worker *roadrunner.Worker, caused error) {
+ if c.lsn != nil {
+ c.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
}
}
-// Attach watcher to the pool
-func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
- wp := &watcher{
- tick: wch.tick,
- lsn: wch.lsn,
- cfg: wch.cfg,
- sw: newStateWatcher(),
+// Attach controller to the pool
+func (c *controller) Attach(pool roadrunner.Pool) roadrunner.Controller {
+ wp := &controller{
+ tick: c.tick,
+ lsn: c.lsn,
+ cfg: c.cfg,
+ sw: newStateFilter(),
stop: make(chan interface{}),
}
- go func(wp *watcher, pool roadrunner.Pool) {
+ go func(wp *controller, pool roadrunner.Pool) {
ticker := time.NewTicker(wp.tick)
for {
select {
case <-ticker.C:
- wp.watch(pool)
+ wp.control(pool)
case <-wp.stop:
return
}
@@ -155,7 +155,7 @@ func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
return wp
}
-// Detach watcher from the pool.
-func (wch *watcher) Detach() {
- close(wch.stop)
+// Detach controller from the pool.
+func (c *controller) Detach() {
+ close(c.stop)
}
diff --git a/service/watcher/service.go b/service/watcher/service.go
index 0d419716..3db23b68 100644
--- a/service/watcher/service.go
+++ b/service/watcher/service.go
@@ -5,25 +5,25 @@ import (
"github.com/spiral/roadrunner/service"
)
-// ID defines watcher service name.
-const ID = "watch"
+// ID defines controller service name.
+const ID = "control"
-// Watchable defines the ability to attach rr watcher.
+// Watchable defines the ability to attach rr controller.
type Watchable interface {
- // Watch attaches watcher to the service.
- Watch(w roadrunner.Watcher)
+ // Watch attaches controller to the service.
+ Watch(w roadrunner.Controller)
}
-// Services to watch the state of rr service inside other services.
+// Services to control the state of rr service inside other services.
type Service struct {
cfg *Config
lsns []func(event int, ctx interface{})
}
-// Init watcher service
+// Init controller service
func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
// mount Services to designated services
- for id, watcher := range cfg.Watchers(s.throw) {
+ for id, watcher := range cfg.Controllers(s.throw) {
svc, _ := c.Get(id)
if watchable, ok := svc.(Watchable); ok {
watchable.Watch(watcher)
@@ -33,7 +33,7 @@ func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
return true, nil
}
-// AddListener attaches server event watcher.
+// AddListener attaches server event controller.
func (s *Service) AddListener(l func(event int, ctx interface{})) {
s.lsns = append(s.lsns, l)
}
diff --git a/service/watcher/state_watch.go b/service/watcher/state_filter.go
index 3090d15d..d85f1308 100644
--- a/service/watcher/state_watch.go
+++ b/service/watcher/state_filter.go
@@ -5,7 +5,7 @@ import (
"time"
)
-type stateWatcher struct {
+type stateFilter struct {
prev map[*roadrunner.Worker]state
next map[*roadrunner.Worker]state
}
@@ -16,20 +16,20 @@ type state struct {
since time.Time
}
-func newStateWatcher() *stateWatcher {
- return &stateWatcher{
+func newStateFilter() *stateFilter {
+ return &stateFilter{
prev: make(map[*roadrunner.Worker]state),
next: make(map[*roadrunner.Worker]state),
}
}
// add new worker to be watched
-func (sw *stateWatcher) push(w *roadrunner.Worker) {
+func (sw *stateFilter) push(w *roadrunner.Worker) {
sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()}
}
// update worker states.
-func (sw *stateWatcher) sync(t time.Time) {
+func (sw *stateFilter) sync(t time.Time) {
for w := range sw.prev {
if _, ok := sw.next[w]; !ok {
delete(sw.prev, w)
@@ -47,7 +47,7 @@ func (sw *stateWatcher) sync(t time.Time) {
}
// find all workers which spend given amount of time in a specific state.
-func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) {
+func (sw *stateFilter) find(state int64, since time.Time) (workers []*roadrunner.Worker) {
for w, s := range sw.prev {
if s.state == state && s.since.Before(since) {
workers = append(workers, w)
diff --git a/static_pool.go b/static_pool.go
index 02960825..66b1366e 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -84,7 +84,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
return p, nil
}
-// Listen attaches pool event watcher.
+// Listen attaches pool event controller.
func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
p.mul.Lock()
defer p.mul.Unlock()
diff --git a/watcher.go b/watcher.go
deleted file mode 100644
index 4527fe68..00000000
--- a/watcher.go
+++ /dev/null
@@ -1,10 +0,0 @@
-package roadrunner
-
-// Watcher observes pool state and decides if any worker must be destroyed.
-type Watcher interface {
- // Lock watcher on given pool instance.
- Attach(p Pool) Watcher
-
- // Detach pool watching.
- Detach()
-}