summaryrefslogtreecommitdiff
path: root/service/limit
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-04 19:31:06 +0300
committerWolfy-J <[email protected]>2019-05-04 19:31:06 +0300
commit2afa417f4f46b31b79043e3e56513d51e4ad2fde (patch)
treee3939afb104647e82bd8bbed85f75616c1faf0eb /service/limit
parent2efc533f2aac215d487a80020b0f9bf4ae5209c3 (diff)
renamings
Diffstat (limited to 'service/limit')
-rw-r--r--service/limit/config.go48
-rw-r--r--service/limit/controller.go161
-rw-r--r--service/limit/service.go46
-rw-r--r--service/limit/state_filter.go58
4 files changed, 313 insertions, 0 deletions
diff --git a/service/limit/config.go b/service/limit/config.go
new file mode 100644
index 00000000..bf842ac2
--- /dev/null
+++ b/service/limit/config.go
@@ -0,0 +1,48 @@
+package limit
+
+import (
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+ "time"
+)
+
+// Configures set of Services.
+type Config struct {
+ // Interval defines the update duration for underlying controllers, default 1s.
+ Interval time.Duration
+
+ // Services declares list of services to be watched.
+ Services map[string]*controllerConfig
+}
+
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ // Always use second based definition for time durations
+ if c.Interval < time.Microsecond {
+ c.Interval = time.Second * time.Duration(c.Interval.Nanoseconds())
+ }
+
+ return nil
+}
+
+// InitDefaults sets missing values to their default values.
+func (c *Config) InitDefaults() error {
+ c.Interval = time.Second
+
+ return nil
+}
+
+// Controllers returns list of defined Services
+func (c *Config) Controllers(l listener) (controllers map[string]roadrunner.Controller) {
+ controllers = make(map[string]roadrunner.Controller)
+
+ for name, cfg := range c.Services {
+ controllers[name] = &controller{lsn: l, tick: c.Interval, cfg: cfg}
+ }
+
+ return controllers
+}
diff --git a/service/limit/controller.go b/service/limit/controller.go
new file mode 100644
index 00000000..bdbab003
--- /dev/null
+++ b/service/limit/controller.go
@@ -0,0 +1,161 @@
+package limit
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/util"
+ "time"
+)
+
+const (
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory = iota + 8000
+
+ // EventMaxTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
+ EventMaxTTL
+
+ // EventMaxIdleTTL triggered when worker spends too much time at rest.
+ EventMaxIdleTTL
+
+ // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventMaxExecTTL
+)
+
+// handles controller events
+type listener func(event int, ctx interface{})
+
+// defines the controller behaviour
+type controllerConfig struct {
+ // MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
+ MaxMemory uint64
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL int64
+
+ // MaxIdleTTL defines maximum duration worker can spend in idle mode.
+ MaxIdleTTL int64
+
+ // MaxExecTTL defines maximum lifetime per job.
+ MaxExecTTL int64
+}
+
+type controller struct {
+ lsn listener
+ tick time.Duration
+ cfg *controllerConfig
+
+ // list of workers which are currently working
+ sw *stateFilter
+
+ stop chan interface{}
+}
+
+// control the pool state
+func (c *controller) control(p roadrunner.Pool) {
+ c.loadWorkers(p)
+
+ now := time.Now()
+
+ if c.cfg.MaxExecTTL != 0 {
+ for _, w := range c.sw.find(
+ roadrunner.StateWorking,
+ now.Add(-time.Second*time.Duration(c.cfg.MaxExecTTL)),
+ ) {
+ eID := w.State().NumExecs()
+ err := fmt.Errorf("max exec time reached (%vs)", c.cfg.MaxExecTTL)
+
+ // make sure worker still on initial request
+ if p.Remove(w, err) && w.State().NumExecs() == eID {
+ go w.Kill()
+ c.report(EventMaxExecTTL, w, err)
+ }
+ }
+ }
+
+ // locale workers which are in idle mode for too long
+ if c.cfg.MaxIdleTTL != 0 {
+ for _, w := range c.sw.find(
+ roadrunner.StateReady,
+ now.Add(-time.Second*time.Duration(c.cfg.MaxIdleTTL)),
+ ) {
+ err := fmt.Errorf("max idle time reached (%vs)", c.cfg.MaxIdleTTL)
+ if p.Remove(w, err) {
+ c.report(EventMaxIdleTTL, w, err)
+ }
+ }
+ }
+}
+
+func (c *controller) loadWorkers(p roadrunner.Pool) {
+ now := time.Now()
+
+ for _, w := range p.Workers() {
+ if w.State().Value() == roadrunner.StateInvalid {
+ // skip duplicate assessment
+ continue
+ }
+
+ s, err := util.WorkerState(w)
+ if err != nil {
+ continue
+ }
+
+ if c.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(c.cfg.TTL) {
+ err := fmt.Errorf("max TTL reached (%vs)", c.cfg.TTL)
+ if p.Remove(w, err) {
+ c.report(EventMaxTTL, w, err)
+ }
+ continue
+ }
+
+ if c.cfg.MaxMemory != 0 && s.MemoryUsage >= c.cfg.MaxMemory*1024*1024 {
+ err := fmt.Errorf("max allowed memory reached (%vMB)", c.cfg.MaxMemory)
+ if p.Remove(w, err) {
+ c.report(EventMaxMemory, w, err)
+ }
+ continue
+ }
+
+ // control the worker state changes
+ c.sw.push(w)
+ }
+
+ c.sw.sync(now)
+}
+
+// throw controller event
+func (c *controller) report(event int, worker *roadrunner.Worker, caused error) {
+ if c.lsn != nil {
+ c.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
+ }
+}
+
+// Attach controller to the pool
+func (c *controller) Attach(pool roadrunner.Pool) roadrunner.Controller {
+ wp := &controller{
+ tick: c.tick,
+ lsn: c.lsn,
+ cfg: c.cfg,
+ sw: newStateFilter(),
+ stop: make(chan interface{}),
+ }
+
+ go func(wp *controller, pool roadrunner.Pool) {
+ ticker := time.NewTicker(wp.tick)
+ for {
+ select {
+ case <-ticker.C:
+ wp.control(pool)
+ case <-wp.stop:
+ return
+ }
+ }
+ }(wp, pool)
+
+ return wp
+}
+
+// Detach controller from the pool.
+func (c *controller) Detach() {
+ close(c.stop)
+}
diff --git a/service/limit/service.go b/service/limit/service.go
new file mode 100644
index 00000000..72673d1f
--- /dev/null
+++ b/service/limit/service.go
@@ -0,0 +1,46 @@
+package limit
+
+import (
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+)
+
+// ID defines controller service name.
+const ID = "constrain"
+
+// Controllable defines the ability to attach rr controller.
+type Controllable interface {
+ // AddController attaches controller to the service.
+ AddController(c roadrunner.Controller)
+}
+
+// Services to control the state of rr service inside other services.
+type Service struct {
+ cfg *Config
+ lsns []func(event int, ctx interface{})
+}
+
+// Init controller service
+func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
+ // mount Services to designated services
+ for id, watcher := range cfg.Controllers(s.throw) {
+ svc, _ := c.Get(id)
+ if ctrl, ok := svc.(Controllable); ok {
+ ctrl.AddController(watcher)
+ }
+ }
+
+ return true, nil
+}
+
+// AddListener attaches server event controller.
+func (s *Service) AddListener(l func(event int, ctx interface{})) {
+ s.lsns = append(s.lsns, l)
+}
+
+// throw handles service, server and pool events.
+func (s *Service) throw(event int, ctx interface{}) {
+ for _, l := range s.lsns {
+ l(event, ctx)
+ }
+}
diff --git a/service/limit/state_filter.go b/service/limit/state_filter.go
new file mode 100644
index 00000000..cd2eca94
--- /dev/null
+++ b/service/limit/state_filter.go
@@ -0,0 +1,58 @@
+package limit
+
+import (
+ "github.com/spiral/roadrunner"
+ "time"
+)
+
+type stateFilter struct {
+ prev map[*roadrunner.Worker]state
+ next map[*roadrunner.Worker]state
+}
+
+type state struct {
+ state int64
+ numExecs int64
+ since time.Time
+}
+
+func newStateFilter() *stateFilter {
+ return &stateFilter{
+ prev: make(map[*roadrunner.Worker]state),
+ next: make(map[*roadrunner.Worker]state),
+ }
+}
+
+// add new worker to be watched
+func (sw *stateFilter) push(w *roadrunner.Worker) {
+ sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()}
+}
+
+// update worker states.
+func (sw *stateFilter) sync(t time.Time) {
+ for w := range sw.prev {
+ if _, ok := sw.next[w]; !ok {
+ delete(sw.prev, w)
+ }
+ }
+
+ for w, s := range sw.next {
+ ps, ok := sw.prev[w]
+ if !ok || ps.state != s.state || ps.numExecs != s.numExecs {
+ sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t}
+ }
+
+ delete(sw.next, w)
+ }
+}
+
+// find all workers which spend given amount of time in a specific state.
+func (sw *stateFilter) find(state int64, since time.Time) (workers []*roadrunner.Worker) {
+ for w, s := range sw.prev {
+ if s.state == state && s.since.Before(since) {
+ workers = append(workers, w)
+ }
+ }
+
+ return
+}