summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-03 15:44:51 +0300
committerWolfy-J <[email protected]>2019-05-03 15:44:51 +0300
commit28c787d66c2b74dd2300c792abd1e4f987c3d6c9 (patch)
treeb9c5ef036eda3ffa16b5e87a06ce99fcd8a4d7b4
parente9d42947a6922ce2f0aa9f9bcab4ead167735bc9 (diff)
new watchers functionality
-rw-r--r--cmd/rr/cmd/root.go11
-rw-r--r--cmd/rr/main.go2
-rw-r--r--cmd/util/debug.go24
-rw-r--r--pool.go4
-rw-r--r--service/container.go2
-rw-r--r--service/env/service.go2
-rw-r--r--service/http/service.go4
-rw-r--r--service/rpc/service.go2
-rw-r--r--service/static/service.go2
-rw-r--r--service/watcher/config.go55
-rw-r--r--service/watcher/service.go46
-rw-r--r--service/watcher/watcher.go107
-rw-r--r--static_pool.go9
13 files changed, 253 insertions, 17 deletions
diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go
index 74506004..2e170307 100644
--- a/cmd/rr/cmd/root.go
+++ b/cmd/rr/cmd/root.go
@@ -25,10 +25,11 @@ import (
"github.com/spf13/cobra"
"github.com/spiral/roadrunner/cmd/util"
"github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/service/watcher"
"os"
)
-// Service bus for all the commands.
+// Services bus for all the commands.
var (
cfgFile, workDir, logFormat string
override []string
@@ -106,6 +107,14 @@ func init() {
util.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err)
os.Exit(1)
}
+
+ // global watcher config
+ wcv, _ := Container.Get(watcher.ID)
+ if wcv, ok := wcv.(*watcher.Service); ok {
+ wcv.AddListener(func(event int, ctx interface{}) {
+ util.LogEvent(Logger, event, ctx)
+ })
+ }
})
}
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
index 54915957..dc2fbc20 100644
--- a/cmd/rr/main.go
+++ b/cmd/rr/main.go
@@ -24,6 +24,7 @@ package main
import (
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/service/watcher"
// services (plugins)
"github.com/spiral/roadrunner/service/env"
@@ -40,6 +41,7 @@ func main() {
rr.Container.Register(rpc.ID, &rpc.Service{})
rr.Container.Register(http.ID, &http.Service{})
rr.Container.Register(static.ID, &static.Service{})
+ rr.Container.Register(watcher.ID, &watcher.Service{})
// you can register additional commands using cmd.CLI
rr.Execute()
diff --git a/cmd/util/debug.go b/cmd/util/debug.go
index f64b9bc4..54ef104f 100644
--- a/cmd/util/debug.go
+++ b/cmd/util/debug.go
@@ -3,6 +3,7 @@ package util
import (
"github.com/sirupsen/logrus"
"github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service/watcher"
"strings"
)
@@ -12,7 +13,7 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool {
case roadrunner.EventWorkerKill:
w := ctx.(*roadrunner.Worker)
logger.Warning(Sprintf(
- "<white+hb>worker.%v</reset> <yellow>killed</red>",
+ "<white+hb>worker.%v</reset> <yellow>killed</reset>",
*w.Pid,
))
return true
@@ -57,5 +58,26 @@ func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool {
return true
}
+ // watchers
+ switch event {
+ case watcher.EventMaxTTL:
+ w := ctx.(roadrunner.WorkerError)
+ logger.Debug(Sprintf(
+ "<white+hb>worker.%v</reset> <yellow>%s</reset>",
+ *w.Worker.Pid,
+ w.Caused,
+ ))
+ return true
+
+ case watcher.EventMaxMemory:
+ w := ctx.(roadrunner.WorkerError)
+ logger.Warning(Sprintf(
+ "<white+hb>worker.%v</reset> <red>%s</reset>",
+ *w.Worker.Pid,
+ w.Caused,
+ ))
+ return true
+ }
+
return false
}
diff --git a/pool.go b/pool.go
index 13e99fa8..23857604 100644
--- a/pool.go
+++ b/pool.go
@@ -31,8 +31,8 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []*Worker)
- // Remove forces pool to remove specific worker.
- Remove(w *Worker, err error)
+ // Remove forces pool to remove specific worker. Return true is this is first remove request on given worker.
+ Remove(w *Worker, err error) bool
// Destroy all underlying workers (but let them to complete the task).
Destroy()
diff --git a/service/container.go b/service/container.go
index 6aa99614..abeaf369 100644
--- a/service/container.go
+++ b/service/container.go
@@ -16,7 +16,7 @@ var errNoConfig = fmt.Errorf("no config has been provided")
// implement service.HydrateConfig.
const InitMethod = "Init"
-// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept
+// Services can serve. Services can provide Init method which must return (bool, error) signature and might accept
// other services and/or configs as dependency.
type Service interface {
// Serve serves.
diff --git a/service/env/service.go b/service/env/service.go
index 83175b36..00bc41ef 100644
--- a/service/env/service.go
+++ b/service/env/service.go
@@ -3,7 +3,7 @@ package env
// ID contains default service name.
const ID = "env"
-// Service provides ability to map _ENV values from config file.
+// Services provides ability to map _ENV values from config file.
type Service struct {
// values is default set of values.
values map[string]string
diff --git a/service/http/service.go b/service/http/service.go
index 716499a4..651284b4 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -25,7 +25,7 @@ const (
// http middleware type.
type middleware func(f http.HandlerFunc) http.HandlerFunc
-// Service manages rr, http servers.
+// Services manages rr, http servers.
type Service struct {
cfg *Config
env env.Environment
@@ -40,7 +40,7 @@ type Service struct {
}
// Watch attaches watcher.
-func (s *Service) AttachWatcher(w roadrunner.Watcher) {
+func (s *Service) Watch(w roadrunner.Watcher) {
s.watcher = w
}
diff --git a/service/rpc/service.go b/service/rpc/service.go
index 066124a2..ea262615 100644
--- a/service/rpc/service.go
+++ b/service/rpc/service.go
@@ -12,7 +12,7 @@ import (
// ID contains default service name.
const ID = "rpc"
-// Service is RPC service.
+// Services is RPC service.
type Service struct {
cfg *Config
stop chan interface{}
diff --git a/service/static/service.go b/service/static/service.go
index b824e787..679033f2 100644
--- a/service/static/service.go
+++ b/service/static/service.go
@@ -9,7 +9,7 @@ import (
// ID contains default service name.
const ID = "static"
-// Service serves static files. Potentially convert into middleware?
+// Services serves static files. Potentially convert into middleware?
type Service struct {
// server configuration (location, forbidden files and etc)
cfg *Config
diff --git a/service/watcher/config.go b/service/watcher/config.go
new file mode 100644
index 00000000..dcd31777
--- /dev/null
+++ b/service/watcher/config.go
@@ -0,0 +1,55 @@
+package watcher
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+ "time"
+)
+
+// Configures set of Services.
+type Config struct {
+ // Interval defines the update duration for underlying watchers, default 1s.
+ Interval time.Duration
+
+ // Services declares list of services to be watched.
+ Services map[string]*watcherConfig
+}
+
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ // Always use second based definition for time durations
+ if c.Interval < time.Microsecond {
+ c.Interval = time.Second * time.Duration(c.Interval.Nanoseconds())
+ }
+
+ for name, cfg := range c.Services {
+ if err := cfg.Normalize(); err != nil {
+ return fmt.Errorf("invalid watcher `%s`: %s", name, err.Error())
+ }
+ }
+
+ return nil
+}
+
+// InitDefaults sets missing values to their default values.
+func (c *Config) InitDefaults() error {
+ c.Interval = time.Second
+
+ return nil
+}
+
+// Watchers returns list of defined Services
+func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) {
+ watchers = make(map[string]roadrunner.Watcher)
+
+ for name, cfg := range c.Services {
+ watchers[name] = &watcher{lsn: l, interval: c.Interval, cfg: cfg}
+ }
+
+ return watchers
+}
diff --git a/service/watcher/service.go b/service/watcher/service.go
new file mode 100644
index 00000000..c81ff3f5
--- /dev/null
+++ b/service/watcher/service.go
@@ -0,0 +1,46 @@
+package watcher
+
+import (
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+)
+
+// ID defines watcher service name.
+const ID = "watch"
+
+// Watchable defines the ability to attach roadrunner watcher.
+type Watchable interface {
+ // Watch attaches watcher to the service.
+ Watch(w roadrunner.Watcher)
+}
+
+// Services to watch the state of roadrunner service inside other services.
+type Service struct {
+ cfg *Config
+ lsns []func(event int, ctx interface{})
+}
+
+// Init watcher service
+func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
+ // mount Services to designated services
+ for id, watcher := range cfg.Watchers(s.throw) {
+ svc, _ := c.Get(id)
+ if watchable, ok := svc.(Watchable); ok {
+ watchable.Watch(watcher)
+ }
+ }
+
+ return true, nil
+}
+
+// AddListener attaches server event watcher.
+func (s *Service) AddListener(l func(event int, ctx interface{})) {
+ s.lsns = append(s.lsns, l)
+}
+
+// throw handles service, server and pool events.
+func (s *Service) throw(event int, ctx interface{}) {
+ for _, l := range s.lsns {
+ l(event, ctx)
+ }
+}
diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go
index 10c642cb..9965ca6e 100644
--- a/service/watcher/watcher.go
+++ b/service/watcher/watcher.go
@@ -1,9 +1,106 @@
package watcher
-import "github.com/spiral/roadrunner"
+import (
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/util"
+ "time"
+)
-// Watchable defines the ability to attach roadrunner watcher.
-type Watchable interface {
- // Watch attaches watcher to the service.
- Watch(w roadrunner.Watcher)
+const (
+ // EventMaxTTL thrown when worker is removed due MaxTTL being reached. Context is roadrunner.WorkerError
+ EventMaxTTL = iota + 8000
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+)
+
+// handles watcher events
+type listener func(event int, ctx interface{})
+
+// defines the watcher behaviour
+type watcherConfig struct {
+ // MaxTTL defines maximum time worker is allowed to live.
+ MaxTTL time.Duration
+
+ // MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
+ MaxMemory uint64
+}
+
+// Normalize watcher config and upscale the durations.
+func (c *watcherConfig) Normalize() error {
+ // Always use second based definition for time durations
+ if c.MaxTTL < time.Microsecond {
+ c.MaxTTL = time.Second * time.Duration(c.MaxTTL.Nanoseconds())
+ }
+
+ return nil
+}
+
+type watcher struct {
+ lsn listener
+ interval time.Duration
+ cfg *watcherConfig
+ stop chan interface{}
+}
+
+// watch the pool state
+func (watch *watcher) watch(p roadrunner.Pool) {
+ now := time.Now()
+ for _, w := range p.Workers() {
+ if watch.cfg.MaxTTL != 0 && now.Sub(w.Created) >= watch.cfg.MaxTTL {
+ err := fmt.Errorf("max TTL reached (%s)", watch.cfg.MaxTTL)
+ if p.Remove(w, err) {
+ watch.report(EventMaxTTL, w, err)
+ }
+ }
+
+ state, err := util.WorkerState(w)
+ if err != nil {
+ continue
+ }
+
+ if watch.cfg.MaxMemory != 0 && state.MemoryUsage >= watch.cfg.MaxMemory*1024*1024 {
+ err := fmt.Errorf("max allowed memory reached (%vMB)", watch.cfg.MaxMemory)
+ if p.Remove(w, err) {
+ watch.report(EventMaxMemory, w, err)
+ }
+ }
+ }
+}
+
+// throw watcher event
+func (watch *watcher) report(event int, worker *roadrunner.Worker, caused error) {
+ if watch.lsn != nil {
+ watch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
+ }
+}
+
+// Attach watcher to the pool
+func (watch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
+ wp := &watcher{
+ interval: watch.interval,
+ lsn: watch.lsn,
+ cfg: watch.cfg,
+ stop: make(chan interface{}),
+ }
+
+ go func(wp *watcher, pool roadrunner.Pool) {
+ ticker := time.NewTicker(wp.interval)
+ for {
+ select {
+ case <-ticker.C:
+ wp.watch(pool)
+ case <-wp.stop:
+ return
+ }
+ }
+ }(wp, pool)
+
+ return wp
+}
+
+// Detach watcher from the pool.
+func (watch *watcher) Detach() {
+ close(watch.stop)
}
diff --git a/static_pool.go b/static_pool.go
index 5f60099a..02960825 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -116,13 +116,18 @@ func (p *StaticPool) Workers() (workers []*Worker) {
}
// Remove forces pool to remove specific worker.
-func (p *StaticPool) Remove(w *Worker, err error) {
+func (p *StaticPool) Remove(w *Worker, err error) bool {
if w.State().Value() != StateReady && w.State().Value() != StateWorking {
// unable to remove inactive worker
- return
+ return false
+ }
+
+ if _, ok := p.remove.Load(w); ok {
+ return false
}
p.remove.Store(w, err)
+ return true
}
// Exec one task with given payload and context, returns result or error.