diff options
author | Valery Piashchynski <[email protected]> | 2020-10-13 13:55:20 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-13 13:55:20 +0300 |
commit | 0dc44d54cfcc9dd3fa09a41136f35a9a8d26b994 (patch) | |
tree | ffcb65010bebe9f5b5436192979e64b2402a6ec0 /service/limit | |
parent | 08d6b6b7f773f83b286cd48c1a0fbec9a62fb42b (diff) |
Initial commit of RR 2.0v2.0.0-alpha1
Diffstat (limited to 'service/limit')
-rw-r--r-- | service/limit/config.go | 48 | ||||
-rw-r--r-- | service/limit/config_test.go | 51 | ||||
-rw-r--r-- | service/limit/controller.go | 166 | ||||
-rw-r--r-- | service/limit/service.go | 39 | ||||
-rw-r--r-- | service/limit/service_test.go | 500 | ||||
-rw-r--r-- | service/limit/state_filter.go | 58 |
6 files changed, 0 insertions, 862 deletions
diff --git a/service/limit/config.go b/service/limit/config.go deleted file mode 100644 index 203db11b..00000000 --- a/service/limit/config.go +++ /dev/null @@ -1,48 +0,0 @@ -package limit - -import ( - "github.com/spiral/roadrunner" - "github.com/spiral/roadrunner/service" - "time" -) - -// Config of Limit service. -type Config struct { - // Interval defines the update duration for underlying controllers, default 1s. - Interval time.Duration - - // Services declares list of services to be watched. - Services map[string]*controllerConfig -} - -// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. -func (c *Config) Hydrate(cfg service.Config) error { - if err := cfg.Unmarshal(c); err != nil { - return err - } - - // Always use second based definition for time durations - if c.Interval < time.Microsecond { - c.Interval = time.Second * time.Duration(c.Interval.Nanoseconds()) - } - - return nil -} - -// InitDefaults sets missing values to their default values. -func (c *Config) InitDefaults() error { - c.Interval = time.Second - - return nil -} - -// Controllers returns list of defined Services -func (c *Config) Controllers(l listener) (controllers map[string]roadrunner.Controller) { - controllers = make(map[string]roadrunner.Controller) - - for name, cfg := range c.Services { - controllers[name] = &controller{lsn: l, tick: c.Interval, cfg: cfg} - } - - return controllers -} diff --git a/service/limit/config_test.go b/service/limit/config_test.go deleted file mode 100644 index c79836b8..00000000 --- a/service/limit/config_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package limit - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { - j := json.ConfigCompatibleWithStandardLibrary - return j.Unmarshal([]byte(cfg.cfg), out) -} - -func Test_Config_Hydrate_Error1(t *testing.T) { - cfg := &mockCfg{`{"enable: true}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Controller_Default(t *testing.T) { - cfg := &mockCfg{` -{ - "services":{ - "http": { - "TTL": 1 - } - } -} -`} - c := &Config{} - err := c.InitDefaults() - if err != nil { - t.Errorf("failed to InitDefaults: error %v", err) - } - - assert.NoError(t, c.Hydrate(cfg)) - assert.Equal(t, time.Second, c.Interval) - - list := c.Controllers(func(event int, ctx interface{}) { - }) - - sc := list["http"] - - assert.Equal(t, time.Second, sc.(*controller).tick) -} diff --git a/service/limit/controller.go b/service/limit/controller.go deleted file mode 100644 index 24a158f7..00000000 --- a/service/limit/controller.go +++ /dev/null @@ -1,166 +0,0 @@ -package limit - -import ( - "fmt" - "github.com/spiral/roadrunner" - "github.com/spiral/roadrunner/util" - "time" -) - -const ( - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory = iota + 8000 - - // EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError - EventTTL - - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL -) - -// handles controller events -type listener func(event int, ctx interface{}) - -// defines the controller behaviour -type controllerConfig struct { - // MaxMemory defines maximum amount of memory allowed for worker. In megabytes. - MaxMemory uint64 - - // TTL defines maximum time worker is allowed to live. - TTL int64 - - // IdleTTL defines maximum duration worker can spend in idle mode. - IdleTTL int64 - - // ExecTTL defines maximum lifetime per job. - ExecTTL int64 -} - -type controller struct { - lsn listener - tick time.Duration - cfg *controllerConfig - - // list of workers which are currently working - sw *stateFilter - - stop chan interface{} -} - -// control the pool state -func (c *controller) control(p roadrunner.Pool) { - c.loadWorkers(p) - - now := time.Now() - - if c.cfg.ExecTTL != 0 { - for _, w := range c.sw.find( - roadrunner.StateWorking, - now.Add(-time.Second*time.Duration(c.cfg.ExecTTL)), - ) { - eID := w.State().NumExecs() - err := fmt.Errorf("max exec time reached (%vs)", c.cfg.ExecTTL) - - // make sure worker still on initial request - if p.Remove(w, err) && w.State().NumExecs() == eID { - go func() { - err := w.Kill() - if err != nil { - fmt.Printf("error killing worker with PID number: %d, created: %s", w.Pid, w.Created) - } - }() - c.report(EventExecTTL, w, err) - } - } - } - - // locale workers which are in idle mode for too long - if c.cfg.IdleTTL != 0 { - for _, w := range c.sw.find( - roadrunner.StateReady, - now.Add(-time.Second*time.Duration(c.cfg.IdleTTL)), - ) { - err := fmt.Errorf("max idle time reached (%vs)", c.cfg.IdleTTL) - if p.Remove(w, err) { - c.report(EventIdleTTL, w, err) - } - } - } -} - -func (c *controller) loadWorkers(p roadrunner.Pool) { - now := time.Now() - - for _, w := range p.Workers() { - if w.State().Value() == roadrunner.StateInvalid { - // skip duplicate assessment - continue - } - - s, err := util.WorkerState(w) - if err != nil { - continue - } - - if c.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(c.cfg.TTL) { - err := fmt.Errorf("max TTL reached (%vs)", c.cfg.TTL) - if p.Remove(w, err) { - c.report(EventTTL, w, err) - } - continue - } - - if c.cfg.MaxMemory != 0 && s.MemoryUsage >= c.cfg.MaxMemory*1024*1024 { - err := fmt.Errorf("max allowed memory reached (%vMB)", c.cfg.MaxMemory) - if p.Remove(w, err) { - c.report(EventMaxMemory, w, err) - } - continue - } - - // control the worker state changes - c.sw.push(w) - } - - c.sw.sync(now) -} - -// throw controller event -func (c *controller) report(event int, worker *roadrunner.Worker, caused error) { - if c.lsn != nil { - c.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) - } -} - -// Attach controller to the pool -func (c *controller) Attach(pool roadrunner.Pool) roadrunner.Controller { - wp := &controller{ - tick: c.tick, - lsn: c.lsn, - cfg: c.cfg, - sw: newStateFilter(), - stop: make(chan interface{}), - } - - go func(wp *controller, pool roadrunner.Pool) { - ticker := time.NewTicker(wp.tick) - for { - select { - case <-ticker.C: - wp.control(pool) - case <-wp.stop: - return - } - } - }(wp, pool) - - return wp -} - -// Detach controller from the pool. -func (c *controller) Detach() { - close(c.stop) -} diff --git a/service/limit/service.go b/service/limit/service.go deleted file mode 100644 index c0b4139c..00000000 --- a/service/limit/service.go +++ /dev/null @@ -1,39 +0,0 @@ -package limit - -import ( - "github.com/spiral/roadrunner" - "github.com/spiral/roadrunner/service" -) - -// ID defines controller service name. -const ID = "limit" - -// Service to control the state of rr service inside other services. -type Service struct { - lsns []func(event int, ctx interface{}) -} - -// Init controller service -func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { - // mount Services to designated services - for id, watcher := range cfg.Controllers(s.throw) { - svc, _ := c.Get(id) - if ctrl, ok := svc.(roadrunner.Attacher); ok { - ctrl.Attach(watcher) - } - } - - return true, nil -} - -// AddListener attaches server event controller. -func (s *Service) AddListener(l func(event int, ctx interface{})) { - s.lsns = append(s.lsns, l) -} - -// throw handles service, server and pool events. -func (s *Service) throw(event int, ctx interface{}) { - for _, l := range s.lsns { - l(event, ctx) - } -} diff --git a/service/limit/service_test.go b/service/limit/service_test.go deleted file mode 100644 index b358c1c1..00000000 --- a/service/limit/service_test.go +++ /dev/null @@ -1,500 +0,0 @@ -package limit - -import ( - "fmt" - "github.com/cenkalti/backoff/v4" - json "github.com/json-iterator/go" - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" - "github.com/spiral/roadrunner/service" - rrhttp "github.com/spiral/roadrunner/service/http" - "github.com/stretchr/testify/assert" - "io/ioutil" - "net/http" - "testing" - "time" -) - -type testCfg struct { - httpCfg string - limitCfg string - target string -} - -func (cfg *testCfg) Get(name string) service.Config { - if name == rrhttp.ID { - if cfg.httpCfg == "" { - return nil - } - - return &testCfg{target: cfg.httpCfg} - } - - if name == ID { - return &testCfg{target: cfg.limitCfg} - } - - return nil -} - -func (cfg *testCfg) Unmarshal(out interface{}) error { - j := json.ConfigCompatibleWithStandardLibrary - err := j.Unmarshal([]byte(cfg.target), out) - - if cl, ok := out.(*Config); ok { - // to speed up tests - cl.Interval = time.Millisecond - } - - return err -} - -func Test_Service_PidEcho(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(rrhttp.ID, &rrhttp.Service{}) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{ - httpCfg: `{ - "address": ":17029", - "workers":{ - "command": "php ../../tests/http/client.php pid pipes", - "pool": {"numWorkers": 1} - } - }`, - limitCfg: `{ - "services": { - "http": { - "ttl": 1 - } - } - }`, - }) - if err != nil { - return err - } - - s, _ := c.Get(rrhttp.ID) - assert.NotNil(t, s) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - - time.Sleep(time.Millisecond * 100) - req, err := http.NewRequest("GET", "http://localhost:17029", nil) - if err != nil { - return err - } - - r, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return err - } - - assert.Equal(t, getPID(s), string(b)) - - err2 := r.Body.Close() - if err2 != nil { - t.Errorf("error during the body closing: error %v", err2) - } - c.Stop() - return nil - - }, bkoff) - - if err != nil { - t.Fatal(err) - } - -} - -func Test_Service_ListenerPlusTTL(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(rrhttp.ID, &rrhttp.Service{}) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{ - httpCfg: `{ - "address": ":7030", - "workers":{ - "command": "php ../../tests/http/client.php pid pipes", - "pool": {"numWorkers": 1} - } - }`, - limitCfg: `{ - "services": { - "http": { - "ttl": 1 - } - } - }`, - }) - if err != nil { - return err - } - - s, _ := c.Get(rrhttp.ID) - assert.NotNil(t, s) - - l, _ := c.Get(ID) - captured := make(chan interface{}) - l.(*Service).AddListener(func(event int, ctx interface{}) { - if event == EventTTL { - close(captured) - } - }) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - - - time.Sleep(time.Millisecond * 100) - - lastPID := getPID(s) - - req, err := http.NewRequest("GET", "http://localhost:7030", nil) - if err != nil { - return err - } - - r, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return err - } - assert.Equal(t, lastPID, string(b)) - - <-captured - - // clean state - req, err = http.NewRequest("GET", "http://localhost:7030?new", nil) - if err != nil { - return err - } - - _, err = http.DefaultClient.Do(req) - if err != nil { - return err - } - - assert.NotEqual(t, lastPID, getPID(s)) - - c.Stop() - - err2 := r.Body.Close() - if err2 != nil { - t.Errorf("error during the body closing: error %v", err2) - } - - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } - -} - -func Test_Service_ListenerPlusIdleTTL(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(rrhttp.ID, &rrhttp.Service{}) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{ - httpCfg: `{ - "address": ":7031", - "workers":{ - "command": "php ../../tests/http/client.php pid pipes", - "pool": {"numWorkers": 1} - } - }`, - limitCfg: `{ - "services": { - "http": { - "idleTtl": 1 - } - } - }`, - }) - if err != nil { - return err - } - - s, _ := c.Get(rrhttp.ID) - assert.NotNil(t, s) - - l, _ := c.Get(ID) - captured := make(chan interface{}) - l.(*Service).AddListener(func(event int, ctx interface{}) { - if event == EventIdleTTL { - close(captured) - } - }) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - - - time.Sleep(time.Millisecond * 100) - - lastPID := getPID(s) - - req, err := http.NewRequest("GET", "http://localhost:7031", nil) - if err != nil { - return err - } - - r, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return err - } - assert.Equal(t, lastPID, string(b)) - - <-captured - - // clean state - req, err = http.NewRequest("GET", "http://localhost:7031?new", nil) - if err != nil { - return err - } - - _, err = http.DefaultClient.Do(req) - if err != nil { - return err - } - - assert.NotEqual(t, lastPID, getPID(s)) - - c.Stop() - err2 := r.Body.Close() - if err2 != nil { - t.Errorf("error during the body closing: error %v", err2) - } - return nil - }, bkoff) - if err != nil { - t.Fatal(err) - } -} - -func Test_Service_Listener_MaxExecTTL(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(rrhttp.ID, &rrhttp.Service{}) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{ - httpCfg: `{ - "address": ":7032", - "workers":{ - "command": "php ../../tests/http/client.php stuck pipes", - "pool": {"numWorkers": 1} - } - }`, - limitCfg: `{ - "services": { - "http": { - "execTTL": 1 - } - } - }`, - }) - if err != nil { - return err - } - - s, _ := c.Get(rrhttp.ID) - assert.NotNil(t, s) - - l, _ := c.Get(ID) - captured := make(chan interface{}) - l.(*Service).AddListener(func(event int, ctx interface{}) { - if event == EventExecTTL { - close(captured) - } - }) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - - time.Sleep(time.Millisecond * 100) - - req, err := http.NewRequest("GET", "http://localhost:7032", nil) - if err != nil { - return err - } - - r, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - assert.Equal(t, 500, r.StatusCode) - - <-captured - - c.Stop() - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } -} - -func Test_Service_Listener_MaxMemoryUsage(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(rrhttp.ID, &rrhttp.Service{}) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{ - httpCfg: `{ - "address": ":7033", - "workers":{ - "command": "php ../../tests/http/client.php memleak pipes", - "pool": {"numWorkers": 1} - } - }`, - limitCfg: `{ - "services": { - "http": { - "maxMemory": 10 - } - } - }`, - }) - if err != nil { - return err - } - - s, _ := c.Get(rrhttp.ID) - assert.NotNil(t, s) - - l, _ := c.Get(ID) - captured := make(chan interface{}) - once := false - l.(*Service).AddListener(func(event int, ctx interface{}) { - if event == EventMaxMemory && !once { - close(captured) - once = true - } - }) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - - time.Sleep(time.Millisecond * 100) - - lastPID := getPID(s) - - req, err := http.NewRequest("GET", "http://localhost:7033", nil) - if err != nil { - return err - } - - for { - select { - case <-captured: - _, err := http.DefaultClient.Do(req) - if err != nil { - c.Stop() - t.Errorf("error during sending the http request: error %v", err) - } - assert.NotEqual(t, lastPID, getPID(s)) - c.Stop() - return nil - default: - _, err := http.DefaultClient.Do(req) - if err != nil { - c.Stop() - t.Errorf("error during sending the http request: error %v", err) - } - c.Stop() - return nil - } - } - }, bkoff) - - if err != nil { - t.Fatal(err) - } - -} -func getPID(s interface{}) string { - if len(s.(*rrhttp.Service).Server().Workers()) > 0 { - w := s.(*rrhttp.Service).Server().Workers()[0] - return fmt.Sprintf("%v", *w.Pid) - } else { - panic("no workers") - } -} diff --git a/service/limit/state_filter.go b/service/limit/state_filter.go deleted file mode 100644 index cd2eca94..00000000 --- a/service/limit/state_filter.go +++ /dev/null @@ -1,58 +0,0 @@ -package limit - -import ( - "github.com/spiral/roadrunner" - "time" -) - -type stateFilter struct { - prev map[*roadrunner.Worker]state - next map[*roadrunner.Worker]state -} - -type state struct { - state int64 - numExecs int64 - since time.Time -} - -func newStateFilter() *stateFilter { - return &stateFilter{ - prev: make(map[*roadrunner.Worker]state), - next: make(map[*roadrunner.Worker]state), - } -} - -// add new worker to be watched -func (sw *stateFilter) push(w *roadrunner.Worker) { - sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()} -} - -// update worker states. -func (sw *stateFilter) sync(t time.Time) { - for w := range sw.prev { - if _, ok := sw.next[w]; !ok { - delete(sw.prev, w) - } - } - - for w, s := range sw.next { - ps, ok := sw.prev[w] - if !ok || ps.state != s.state || ps.numExecs != s.numExecs { - sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t} - } - - delete(sw.next, w) - } -} - -// find all workers which spend given amount of time in a specific state. -func (sw *stateFilter) find(state int64, since time.Time) (workers []*roadrunner.Worker) { - for w, s := range sw.prev { - if s.state == state && s.since.Before(since) { - workers = append(workers, w) - } - } - - return -} |