diff options
-rw-r--r-- | cmd/rr/http/reload.go | 29 | ||||
-rw-r--r-- | cmd/rr/http/workers.go | 12 | ||||
-rw-r--r-- | service/container.go | 73 | ||||
-rw-r--r-- | service/container_test.go | 311 |
4 files changed, 369 insertions, 56 deletions
diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reload.go index 5814cc4a..06149065 100644 --- a/cmd/rr/http/reload.go +++ b/cmd/rr/http/reload.go @@ -24,7 +24,6 @@ import ( "errors" "github.com/spf13/cobra" rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/spiral/roadrunner/rpc" ) func init() { @@ -39,18 +38,20 @@ func reloadHandler(cmd *cobra.Command, args []string) error { if !rr.Container.Has("rpc") { return errors.New("RPC service is not configured") } - - client, err := rr.Container.Get("rpc").(*rpc.Service).Client() - if err != nil { - return err - } - defer client.Close() - - var r string - if err := client.Call("http.Reset", true, &r); err != nil { - return err - } - - rr.Logger.Info("http.service: restarting worker pool") return nil + + //todo: change + //client, err := rr.Container.Get("rpc").(*rpc.Service).Client() + //if err != nil { + // return err + //} + //defer client.Close() + // + //var r string + //if err := client.Call("http.Reset", true, &r); err != nil { + // return err + //} + // + //rr.Logger.Info("http.service: restarting worker pool") + //return nil } diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go index 6d74a412..c9aa5b04 100644 --- a/cmd/rr/http/workers.go +++ b/cmd/rr/http/workers.go @@ -24,7 +24,6 @@ import ( "errors" "github.com/spf13/cobra" rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/spiral/roadrunner/rpc" ) func init() { @@ -40,11 +39,12 @@ func workersHandler(cmd *cobra.Command, args []string) error { return errors.New("RPC service is not configured") } - client, err := rr.Container.Get("rpc").(*rpc.Service).Client() - if err != nil { - return err - } - defer client.Close() + //todo: change + //client, err := rr.Container.Get("rpc").(*rpc.Service).Client() + //if err != nil { + // return err + //} + //defer client.Close() //var r http.WorkerList //if err := client.Call("http.Workers", true, &r); err != nil { diff --git a/service/container.go b/service/container.go index 0a1ab472..88d9ef41 100644 --- a/service/container.go +++ b/service/container.go @@ -43,7 +43,7 @@ type Container interface { type Service interface { // Configure must return configure service and return true if service hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. - Configure(cfg Config, reg Container) (enabled bool, err error) + Configure(cfg Config, c Container) (enabled bool, err error) // Serve serves svc. Serve() error @@ -67,25 +67,25 @@ func NewContainer(log logrus.FieldLogger) Container { } // Register add new service to the container under given name. -func (r *container) Register(name string, service Service) { - r.mu.Lock() - defer r.mu.Unlock() +func (c *container) Register(name string, service Service) { + c.mu.Lock() + defer c.mu.Unlock() - r.services = append(r.services, &entry{ + c.services = append(c.services, &entry{ name: name, svc: service, - status: StatusConfigured, + status: StatusRegistered, }) - r.log.Debugf("%s.service: registered", name) + c.log.Debugf("%s.service: registered", name) } // Check hasStatus svc has been registered. -func (r *container) Has(target string) bool { - r.mu.Lock() - defer r.mu.Unlock() +func (c *container) Has(target string) bool { + c.mu.Lock() + defer c.mu.Unlock() - for _, e := range r.services { + for _, e := range c.services { if e.name == target { return true } @@ -95,11 +95,11 @@ func (r *container) Has(target string) bool { } // Get returns svc instance by it's name or nil if svc not found. -func (r *container) Get(target string) (svc Service, status int) { - r.mu.Lock() - defer r.mu.Unlock() +func (c *container) Get(target string) (svc Service, status int) { + c.mu.Lock() + defer c.mu.Unlock() - for _, e := range r.services { + for _, e := range c.services { if e.name == target { return e.svc, e.getStatus() } @@ -109,22 +109,22 @@ func (r *container) Get(target string) (svc Service, status int) { } // Configure configures all underlying services with given configuration. -func (r *container) Configure(cfg Config) error { - r.mu.Lock() - defer r.mu.Unlock() +func (c *container) Configure(cfg Config) error { + c.mu.Lock() + defer c.mu.Unlock() - for _, e := range r.services { + for _, e := range c.services { if e.getStatus() >= StatusConfigured { return fmt.Errorf("service %s has already been configured", e.name) } segment := cfg.Get(e.name) if segment == nil { - r.log.Debugf("%s.service: no config has been provided", e.name) + c.log.Debugf("%s.service: no config has been provided", e.name) continue } - ok, err := e.svc.Configure(segment, r) + ok, err := e.svc.Configure(segment, c) if err != nil { return errors.Wrap(err, fmt.Sprintf("%s.service", e.name)) } else if ok { @@ -136,38 +136,38 @@ func (r *container) Configure(cfg Config) error { } // Serve all configured services. Non blocking. -func (r *container) Serve() error { +func (c *container) Serve() error { var ( numServing int - done = make(chan interface{}, len(r.services)) + done = make(chan interface{}, len(c.services)) ) defer close(done) - r.mu.Lock() - for _, e := range r.services { + c.mu.Lock() + for _, e := range c.services { if e.hasStatus(StatusConfigured) { numServing ++ } else { continue } - go func(s *entry) { - s.setStatus(StatusServing) - defer s.setStatus(StatusStopped) + go func(e *entry) { + e.setStatus(StatusServing) + defer e.setStatus(StatusStopped) - if err := s.svc.Serve(); err != nil { - done <- err + if err := e.svc.Serve(); err != nil { + done <- errors.Wrap(err, fmt.Sprintf("%s.service", e.name)) } }(e) } - r.mu.Unlock() + c.mu.Unlock() for i := 0; i < numServing; i++ { result := <-done // found an error in one of the services, stopping the rest of running services. if err, ok := result.(error); ok { - r.Stop() + c.Stop() return err } } @@ -176,13 +176,14 @@ func (r *container) Serve() error { } // Stop sends stop command to all running services. -func (r *container) Stop() { - r.mu.Lock() - defer r.mu.Unlock() +func (c *container) Stop() { + c.mu.Lock() + defer c.mu.Unlock() - for _, e := range r.services { + for _, e := range c.services { if e.hasStatus(StatusServing) { e.svc.Stop() + e.setStatus(StatusStopped) } } } diff --git a/service/container_test.go b/service/container_test.go index 6d43c336..69d67441 100644 --- a/service/container_test.go +++ b/service/container_test.go @@ -1 +1,312 @@ package service + +import ( + "testing" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/sirupsen/logrus" + "encoding/json" + "errors" + "time" +) + +type testService struct { + waitForServe chan interface{} + delay time.Duration + ok bool + cfg Config + c Container + cfgE, serveE error + serving chan interface{} +} + +func (t *testService) Configure(cfg Config, c Container) (enabled bool, err error) { + t.cfg = cfg + t.c = c + return t.ok, t.cfgE +} + +func (t *testService) Serve() error { + time.Sleep(t.delay) + + if t.serveE != nil { + return t.serveE + } + + if t.waitForServe != nil { + close(t.waitForServe) + t.waitForServe = nil + } + + t.serving = make(chan interface{}) + <-t.serving + + return nil +} + +func (t *testService) Stop() { + close(t.serving) +} + +type testCfg struct{ cfg string } + +func (cfg *testCfg) Get(name string) Config { + vars := make(map[string]string) + json.Unmarshal([]byte(cfg.cfg), &vars) + + v, ok := vars[name] + if !ok { + return nil + } + + return &testCfg{cfg: v} +} +func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func TestContainer_Register(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testService{}) + + assert.Equal(t, 1, len(hook.Entries)) +} + +func TestContainer_Has(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testService{}) + + assert.Equal(t, 1, len(hook.Entries)) + + assert.True(t, c.Has("test")) + assert.False(t, c.Has("another")) +} + +func TestContainer_Get(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testService{}) + + assert.Equal(t, 1, len(hook.Entries)) + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusRegistered, st) + + s, st = c.Get("another") + assert.Nil(t, s) + assert.Equal(t, StatusUndefined, st) +} + +func TestContainer_Stop_NotStarted(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testService{}) + assert.Equal(t, 1, len(hook.Entries)) + + c.Stop() +} + +func TestContainer_Configure(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: true} + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + assert.NoError(t, c.Configure(&testCfg{`{"test":"something"}`})) + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusConfigured, st) +} + +func TestContainer_ConfigureNull(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: true} + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + assert.NoError(t, c.Configure(&testCfg{`{"another":"something"}`})) + assert.Equal(t, 2, len(hook.Entries)) + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusRegistered, st) +} + +func TestContainer_ConfigureDisabled(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: false} + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + assert.NoError(t, c.Configure(&testCfg{`{"test":"something"}`})) + assert.Equal(t, 1, len(hook.Entries)) + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusRegistered, st) +} + +func TestContainer_ConfigureError(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ + ok: false, + cfgE: errors.New("configure error"), + } + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + err := c.Configure(&testCfg{`{"test":"something"}`}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "configure error") + assert.Contains(t, err.Error(), "test.service") + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusRegistered, st) +} + +func TestContainer_ConfigureTwice(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: true} + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + assert.NoError(t, c.Configure(&testCfg{`{"test":"something"}`})) + assert.Error(t, c.Configure(&testCfg{`{"test":"something"}`})) +} + +func TestContainer_ServeEmptyContainer(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: true} + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + + assert.NoError(t, c.Serve()) + c.Stop() +} + +func TestContainer_Serve(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ + ok: true, + waitForServe: make(chan interface{}), + } + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + assert.NoError(t, c.Configure(&testCfg{`{"test":"something"}`})) + + go func() { + assert.NoError(t, c.Serve()) + }() + + <-svc.waitForServe + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusServing, st) + + c.Stop() + + s, st = c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusStopped, st) +} + +func TestContainer_ServeError(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ + ok: true, + waitForServe: make(chan interface{}), + serveE: errors.New("serve error"), + } + + c := NewContainer(logger) + c.Register("test", svc) + assert.Equal(t, 1, len(hook.Entries)) + assert.NoError(t, c.Configure(&testCfg{`{"test":"something"}`})) + + err := c.Serve() + assert.Error(t, err) + assert.Contains(t, err.Error(), "serve error") + assert.Contains(t, err.Error(), "test.service") + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusStopped, st) +} + +func TestContainer_ServeErrorMultiple(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ + ok: true, + delay: time.Millisecond * 10, + waitForServe: make(chan interface{}), + serveE: errors.New("serve error"), + } + + svc2 := &testService{ + ok: true, + waitForServe: make(chan interface{}), + } + + c := NewContainer(logger) + c.Register("test2", svc2) + c.Register("test", svc) + assert.Equal(t, 2, len(hook.Entries)) + assert.NoError(t, c.Configure(&testCfg{`{"test":"something", "test2":"something-else"}`})) + + err := c.Serve() + assert.Error(t, err) + assert.Contains(t, err.Error(), "serve error") + assert.Contains(t, err.Error(), "test.service") + + s, st := c.Get("test") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusStopped, st) + + s, st = c.Get("test2") + assert.IsType(t, &testService{}, s) + assert.Equal(t, StatusStopped, st) +} |