diff options
author | Wolfy-J <[email protected]> | 2019-05-03 15:44:51 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2019-05-03 15:44:51 +0300 |
commit | 28c787d66c2b74dd2300c792abd1e4f987c3d6c9 (patch) | |
tree | b9c5ef036eda3ffa16b5e87a06ce99fcd8a4d7b4 | |
parent | e9d42947a6922ce2f0aa9f9bcab4ead167735bc9 (diff) |
new watchers functionality
-rw-r--r-- | cmd/rr/cmd/root.go | 11 | ||||
-rw-r--r-- | cmd/rr/main.go | 2 | ||||
-rw-r--r-- | cmd/util/debug.go | 24 | ||||
-rw-r--r-- | pool.go | 4 | ||||
-rw-r--r-- | service/container.go | 2 | ||||
-rw-r--r-- | service/env/service.go | 2 | ||||
-rw-r--r-- | service/http/service.go | 4 | ||||
-rw-r--r-- | service/rpc/service.go | 2 | ||||
-rw-r--r-- | service/static/service.go | 2 | ||||
-rw-r--r-- | service/watcher/config.go | 55 | ||||
-rw-r--r-- | service/watcher/service.go | 46 | ||||
-rw-r--r-- | service/watcher/watcher.go | 107 | ||||
-rw-r--r-- | static_pool.go | 9 |
13 files changed, 253 insertions, 17 deletions
diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go index 74506004..2e170307 100644 --- a/cmd/rr/cmd/root.go +++ b/cmd/rr/cmd/root.go @@ -25,10 +25,11 @@ import ( "github.com/spf13/cobra" "github.com/spiral/roadrunner/cmd/util" "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/watcher" "os" ) -// Service bus for all the commands. +// Services bus for all the commands. var ( cfgFile, workDir, logFormat string override []string @@ -106,6 +107,14 @@ func init() { util.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err) os.Exit(1) } + + // global watcher config + wcv, _ := Container.Get(watcher.ID) + if wcv, ok := wcv.(*watcher.Service); ok { + wcv.AddListener(func(event int, ctx interface{}) { + util.LogEvent(Logger, event, ctx) + }) + } }) } diff --git a/cmd/rr/main.go b/cmd/rr/main.go index 54915957..dc2fbc20 100644 --- a/cmd/rr/main.go +++ b/cmd/rr/main.go @@ -24,6 +24,7 @@ package main import ( rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/service/watcher" // services (plugins) "github.com/spiral/roadrunner/service/env" @@ -40,6 +41,7 @@ func main() { rr.Container.Register(rpc.ID, &rpc.Service{}) rr.Container.Register(http.ID, &http.Service{}) rr.Container.Register(static.ID, &static.Service{}) + rr.Container.Register(watcher.ID, &watcher.Service{}) // you can register additional commands using cmd.CLI rr.Execute() diff --git a/cmd/util/debug.go b/cmd/util/debug.go index f64b9bc4..54ef104f 100644 --- a/cmd/util/debug.go +++ b/cmd/util/debug.go @@ -3,6 +3,7 @@ package util import ( "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service/watcher" "strings" ) @@ -12,7 +13,7 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool { case roadrunner.EventWorkerKill: w := ctx.(*roadrunner.Worker) logger.Warning(Sprintf( - "<white+hb>worker.%v</reset> <yellow>killed</red>", + "<white+hb>worker.%v</reset> <yellow>killed</reset>", *w.Pid, )) return true @@ -57,5 +58,26 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool { return true } + // watchers + switch event { + case watcher.EventMaxTTL: + w := ctx.(roadrunner.WorkerError) + logger.Debug(Sprintf( + "<white+hb>worker.%v</reset> <yellow>%s</reset>", + *w.Worker.Pid, + w.Caused, + )) + return true + + case watcher.EventMaxMemory: + w := ctx.(roadrunner.WorkerError) + logger.Warning(Sprintf( + "<white+hb>worker.%v</reset> <red>%s</reset>", + *w.Worker.Pid, + w.Caused, + )) + return true + } + return false } @@ -31,8 +31,8 @@ type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []*Worker) - // Remove forces pool to remove specific worker. - Remove(w *Worker, err error) + // Remove forces pool to remove specific worker. Return true is this is first remove request on given worker. + Remove(w *Worker, err error) bool // Destroy all underlying workers (but let them to complete the task). Destroy() diff --git a/service/container.go b/service/container.go index 6aa99614..abeaf369 100644 --- a/service/container.go +++ b/service/container.go @@ -16,7 +16,7 @@ var errNoConfig = fmt.Errorf("no config has been provided") // implement service.HydrateConfig. const InitMethod = "Init" -// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept +// Services can serve. Services can provide Init method which must return (bool, error) signature and might accept // other services and/or configs as dependency. type Service interface { // Serve serves. diff --git a/service/env/service.go b/service/env/service.go index 83175b36..00bc41ef 100644 --- a/service/env/service.go +++ b/service/env/service.go @@ -3,7 +3,7 @@ package env // ID contains default service name. const ID = "env" -// Service provides ability to map _ENV values from config file. +// Services provides ability to map _ENV values from config file. type Service struct { // values is default set of values. values map[string]string diff --git a/service/http/service.go b/service/http/service.go index 716499a4..651284b4 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -25,7 +25,7 @@ const ( // http middleware type. type middleware func(f http.HandlerFunc) http.HandlerFunc -// Service manages rr, http servers. +// Services manages rr, http servers. type Service struct { cfg *Config env env.Environment @@ -40,7 +40,7 @@ type Service struct { } // Watch attaches watcher. -func (s *Service) AttachWatcher(w roadrunner.Watcher) { +func (s *Service) Watch(w roadrunner.Watcher) { s.watcher = w } diff --git a/service/rpc/service.go b/service/rpc/service.go index 066124a2..ea262615 100644 --- a/service/rpc/service.go +++ b/service/rpc/service.go @@ -12,7 +12,7 @@ import ( // ID contains default service name. const ID = "rpc" -// Service is RPC service. +// Services is RPC service. type Service struct { cfg *Config stop chan interface{} diff --git a/service/static/service.go b/service/static/service.go index b824e787..679033f2 100644 --- a/service/static/service.go +++ b/service/static/service.go @@ -9,7 +9,7 @@ import ( // ID contains default service name. const ID = "static" -// Service serves static files. Potentially convert into middleware? +// Services serves static files. Potentially convert into middleware? type Service struct { // server configuration (location, forbidden files and etc) cfg *Config diff --git a/service/watcher/config.go b/service/watcher/config.go new file mode 100644 index 00000000..dcd31777 --- /dev/null +++ b/service/watcher/config.go @@ -0,0 +1,55 @@ +package watcher + +import ( + "fmt" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" + "time" +) + +// Configures set of Services. +type Config struct { + // Interval defines the update duration for underlying watchers, default 1s. + Interval time.Duration + + // Services declares list of services to be watched. + Services map[string]*watcherConfig +} + +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + // Always use second based definition for time durations + if c.Interval < time.Microsecond { + c.Interval = time.Second * time.Duration(c.Interval.Nanoseconds()) + } + + for name, cfg := range c.Services { + if err := cfg.Normalize(); err != nil { + return fmt.Errorf("invalid watcher `%s`: %s", name, err.Error()) + } + } + + return nil +} + +// InitDefaults sets missing values to their default values. +func (c *Config) InitDefaults() error { + c.Interval = time.Second + + return nil +} + +// Watchers returns list of defined Services +func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) { + watchers = make(map[string]roadrunner.Watcher) + + for name, cfg := range c.Services { + watchers[name] = &watcher{lsn: l, interval: c.Interval, cfg: cfg} + } + + return watchers +} diff --git a/service/watcher/service.go b/service/watcher/service.go new file mode 100644 index 00000000..c81ff3f5 --- /dev/null +++ b/service/watcher/service.go @@ -0,0 +1,46 @@ +package watcher + +import ( + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" +) + +// ID defines watcher service name. +const ID = "watch" + +// Watchable defines the ability to attach roadrunner watcher. +type Watchable interface { + // Watch attaches watcher to the service. + Watch(w roadrunner.Watcher) +} + +// Services to watch the state of roadrunner service inside other services. +type Service struct { + cfg *Config + lsns []func(event int, ctx interface{}) +} + +// Init watcher service +func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { + // mount Services to designated services + for id, watcher := range cfg.Watchers(s.throw) { + svc, _ := c.Get(id) + if watchable, ok := svc.(Watchable); ok { + watchable.Watch(watcher) + } + } + + return true, nil +} + +// AddListener attaches server event watcher. +func (s *Service) AddListener(l func(event int, ctx interface{})) { + s.lsns = append(s.lsns, l) +} + +// throw handles service, server and pool events. +func (s *Service) throw(event int, ctx interface{}) { + for _, l := range s.lsns { + l(event, ctx) + } +} diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go index 10c642cb..9965ca6e 100644 --- a/service/watcher/watcher.go +++ b/service/watcher/watcher.go @@ -1,9 +1,106 @@ package watcher -import "github.com/spiral/roadrunner" +import ( + "fmt" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/util" + "time" +) -// Watchable defines the ability to attach roadrunner watcher. -type Watchable interface { - // Watch attaches watcher to the service. - Watch(w roadrunner.Watcher) +const ( + // EventMaxTTL thrown when worker is removed due MaxTTL being reached. Context is roadrunner.WorkerError + EventMaxTTL = iota + 8000 + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory +) + +// handles watcher events +type listener func(event int, ctx interface{}) + +// defines the watcher behaviour +type watcherConfig struct { + // MaxTTL defines maximum time worker is allowed to live. + MaxTTL time.Duration + + // MaxMemory defines maximum amount of memory allowed for worker. In megabytes. + MaxMemory uint64 +} + +// Normalize watcher config and upscale the durations. +func (c *watcherConfig) Normalize() error { + // Always use second based definition for time durations + if c.MaxTTL < time.Microsecond { + c.MaxTTL = time.Second * time.Duration(c.MaxTTL.Nanoseconds()) + } + + return nil +} + +type watcher struct { + lsn listener + interval time.Duration + cfg *watcherConfig + stop chan interface{} +} + +// watch the pool state +func (watch *watcher) watch(p roadrunner.Pool) { + now := time.Now() + for _, w := range p.Workers() { + if watch.cfg.MaxTTL != 0 && now.Sub(w.Created) >= watch.cfg.MaxTTL { + err := fmt.Errorf("max TTL reached (%s)", watch.cfg.MaxTTL) + if p.Remove(w, err) { + watch.report(EventMaxTTL, w, err) + } + } + + state, err := util.WorkerState(w) + if err != nil { + continue + } + + if watch.cfg.MaxMemory != 0 && state.MemoryUsage >= watch.cfg.MaxMemory*1024*1024 { + err := fmt.Errorf("max allowed memory reached (%vMB)", watch.cfg.MaxMemory) + if p.Remove(w, err) { + watch.report(EventMaxMemory, w, err) + } + } + } +} + +// throw watcher event +func (watch *watcher) report(event int, worker *roadrunner.Worker, caused error) { + if watch.lsn != nil { + watch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) + } +} + +// Attach watcher to the pool +func (watch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher { + wp := &watcher{ + interval: watch.interval, + lsn: watch.lsn, + cfg: watch.cfg, + stop: make(chan interface{}), + } + + go func(wp *watcher, pool roadrunner.Pool) { + ticker := time.NewTicker(wp.interval) + for { + select { + case <-ticker.C: + wp.watch(pool) + case <-wp.stop: + return + } + } + }(wp, pool) + + return wp +} + +// Detach watcher from the pool. +func (watch *watcher) Detach() { + close(watch.stop) } diff --git a/static_pool.go b/static_pool.go index 5f60099a..02960825 100644 --- a/static_pool.go +++ b/static_pool.go @@ -116,13 +116,18 @@ func (p *StaticPool) Workers() (workers []*Worker) { } // Remove forces pool to remove specific worker. -func (p *StaticPool) Remove(w *Worker, err error) { +func (p *StaticPool) Remove(w *Worker, err error) bool { if w.State().Value() != StateReady && w.State().Value() != StateWorking { // unable to remove inactive worker - return + return false + } + + if _, ok := p.remove.Load(w); ok { + return false } p.remove.Store(w, err) + return true } // Exec one task with given payload and context, returns result or error. |