summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/rr/http/reload.go29
-rw-r--r--cmd/rr/http/workers.go12
-rw-r--r--service/container.go73
-rw-r--r--service/container_test.go311
4 files changed, 369 insertions, 56 deletions
diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reload.go
index 5814cc4a..06149065 100644
--- a/cmd/rr/http/reload.go
+++ b/cmd/rr/http/reload.go
@@ -24,7 +24,6 @@ import (
"errors"
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/rpc"
)
func init() {
@@ -39,18 +38,20 @@ func reloadHandler(cmd *cobra.Command, args []string) error {
if !rr.Container.Has("rpc") {
return errors.New("RPC service is not configured")
}
-
- client, err := rr.Container.Get("rpc").(*rpc.Service).Client()
- if err != nil {
- return err
- }
- defer client.Close()
-
- var r string
- if err := client.Call("http.Reset", true, &r); err != nil {
- return err
- }
-
- rr.Logger.Info("http.service: restarting worker pool")
return nil
+
+ //todo: change
+ //client, err := rr.Container.Get("rpc").(*rpc.Service).Client()
+ //if err != nil {
+ // return err
+ //}
+ //defer client.Close()
+ //
+ //var r string
+ //if err := client.Call("http.Reset", true, &r); err != nil {
+ // return err
+ //}
+ //
+ //rr.Logger.Info("http.service: restarting worker pool")
+ //return nil
}
diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go
index 6d74a412..c9aa5b04 100644
--- a/cmd/rr/http/workers.go
+++ b/cmd/rr/http/workers.go
@@ -24,7 +24,6 @@ import (
"errors"
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/rpc"
)
func init() {
@@ -40,11 +39,12 @@ func workersHandler(cmd *cobra.Command, args []string) error {
return errors.New("RPC service is not configured")
}
- client, err := rr.Container.Get("rpc").(*rpc.Service).Client()
- if err != nil {
- return err
- }
- defer client.Close()
+ //todo: change
+ //client, err := rr.Container.Get("rpc").(*rpc.Service).Client()
+ //if err != nil {
+ // return err
+ //}
+ //defer client.Close()
//var r http.WorkerList
//if err := client.Call("http.Workers", true, &r); err != nil {
diff --git a/service/container.go b/service/container.go
index 0a1ab472..88d9ef41 100644
--- a/service/container.go
+++ b/service/container.go
@@ -43,7 +43,7 @@ type Container interface {
type Service interface {
// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
- Configure(cfg Config, reg Container) (enabled bool, err error)
+ Configure(cfg Config, c Container) (enabled bool, err error)
// Serve serves svc.
Serve() error
@@ -67,25 +67,25 @@ func NewContainer(log logrus.FieldLogger) Container {
}
// Register add new service to the container under given name.
-func (r *container) Register(name string, service Service) {
- r.mu.Lock()
- defer r.mu.Unlock()
+func (c *container) Register(name string, service Service) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
- r.services = append(r.services, &entry{
+ c.services = append(c.services, &entry{
name: name,
svc: service,
- status: StatusConfigured,
+ status: StatusRegistered,
})
- r.log.Debugf("%s.service: registered", name)
+ c.log.Debugf("%s.service: registered", name)
}
// Check hasStatus svc has been registered.
-func (r *container) Has(target string) bool {
- r.mu.Lock()
- defer r.mu.Unlock()
+func (c *container) Has(target string) bool {
+ c.mu.Lock()
+ defer c.mu.Unlock()
- for _, e := range r.services {
+ for _, e := range c.services {
if e.name == target {
return true
}
@@ -95,11 +95,11 @@ func (r *container) Has(target string) bool {
}
// Get returns svc instance by it's name or nil if svc not found.
-func (r *container) Get(target string) (svc Service, status int) {
- r.mu.Lock()
- defer r.mu.Unlock()
+func (c *container) Get(target string) (svc Service, status int) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
- for _, e := range r.services {
+ for _, e := range c.services {
if e.name == target {
return e.svc, e.getStatus()
}
@@ -109,22 +109,22 @@ func (r *container) Get(target string) (svc Service, status int) {
}
// Configure configures all underlying services with given configuration.
-func (r *container) Configure(cfg Config) error {
- r.mu.Lock()
- defer r.mu.Unlock()
+func (c *container) Configure(cfg Config) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
- for _, e := range r.services {
+ for _, e := range c.services {
if e.getStatus() >= StatusConfigured {
return fmt.Errorf("service %s has already been configured", e.name)
}
segment := cfg.Get(e.name)
if segment == nil {
- r.log.Debugf("%s.service: no config has been provided", e.name)
+ c.log.Debugf("%s.service: no config has been provided", e.name)
continue
}
- ok, err := e.svc.Configure(segment, r)
+ ok, err := e.svc.Configure(segment, c)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("%s.service", e.name))
} else if ok {
@@ -136,38 +136,38 @@ func (r *container) Configure(cfg Config) error {
}
// Serve all configured services. Non blocking.
-func (r *container) Serve() error {
+func (c *container) Serve() error {
var (
numServing int
- done = make(chan interface{}, len(r.services))
+ done = make(chan interface{}, len(c.services))
)
defer close(done)
- r.mu.Lock()
- for _, e := range r.services {
+ c.mu.Lock()
+ for _, e := range c.services {
if e.hasStatus(StatusConfigured) {
numServing ++
} else {
continue
}
- go func(s *entry) {
- s.setStatus(StatusServing)
- defer s.setStatus(StatusStopped)
+ go func(e *entry) {
+ e.setStatus(StatusServing)
+ defer e.setStatus(StatusStopped)
- if err := s.svc.Serve(); err != nil {
- done <- err
+ if err := e.svc.Serve(); err != nil {
+ done <- errors.Wrap(err, fmt.Sprintf("%s.service", e.name))
}
}(e)
}
- r.mu.Unlock()
+ c.mu.Unlock()
for i := 0; i < numServing; i++ {
result := <-done
// found an error in one of the services, stopping the rest of running services.
if err, ok := result.(error); ok {
- r.Stop()
+ c.Stop()
return err
}
}
@@ -176,13 +176,14 @@ func (r *container) Serve() error {
}
// Stop sends stop command to all running services.
-func (r *container) Stop() {
- r.mu.Lock()
- defer r.mu.Unlock()
+func (c *container) Stop() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
- for _, e := range r.services {
+ for _, e := range c.services {
if e.hasStatus(StatusServing) {
e.svc.Stop()
+ e.setStatus(StatusStopped)
}
}
}
diff --git a/service/container_test.go b/service/container_test.go
index 6d43c336..69d67441 100644
--- a/service/container_test.go
+++ b/service/container_test.go
@@ -1 +1,312 @@
package service
+
+import (
+ "testing"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
+ "github.com/sirupsen/logrus"
+ "encoding/json"
+ "errors"
+ "time"
+)
+
+type testService struct {
+ waitForServe chan interface{}
+ delay time.Duration
+ ok bool
+ cfg Config
+ c Container
+ cfgE, serveE error
+ serving chan interface{}
+}
+
+func (t *testService) Configure(cfg Config, c Container) (enabled bool, err error) {
+ t.cfg = cfg
+ t.c = c
+ return t.ok, t.cfgE
+}
+
+func (t *testService) Serve() error {
+ time.Sleep(t.delay)
+
+ if t.serveE != nil {
+ return t.serveE
+ }
+
+ if t.waitForServe != nil {
+ close(t.waitForServe)
+ t.waitForServe = nil
+ }
+
+ t.serving = make(chan interface{})
+ <-t.serving
+
+ return nil
+}
+
+func (t *testService) Stop() {
+ close(t.serving)
+}
+
+type testCfg struct{ cfg string }
+
+func (cfg *testCfg) Get(name string) Config {
+ vars := make(map[string]string)
+ json.Unmarshal([]byte(cfg.cfg), &vars)
+
+ v, ok := vars[name]
+ if !ok {
+ return nil
+ }
+
+ return &testCfg{cfg: v}
+}
+func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func TestContainer_Register(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testService{})
+
+ assert.Equal(t, 1, len(hook.Entries))
+}
+
+func TestContainer_Has(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testService{})
+
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.True(t, c.Has("test"))
+ assert.False(t, c.Has("another"))
+}
+
+func TestContainer_Get(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testService{})
+
+ assert.Equal(t, 1, len(hook.Entries))
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusRegistered, st)
+
+ s, st = c.Get("another")
+ assert.Nil(t, s)
+ assert.Equal(t, StatusUndefined, st)
+}
+
+func TestContainer_Stop_NotStarted(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testService{})
+ assert.Equal(t, 1, len(hook.Entries))
+
+ c.Stop()
+}
+
+func TestContainer_Configure(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{ok: true}
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.NoError(t, c.Configure(&testCfg{`{"test":"something"}`}))
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusConfigured, st)
+}
+
+func TestContainer_ConfigureNull(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{ok: true}
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.NoError(t, c.Configure(&testCfg{`{"another":"something"}`}))
+ assert.Equal(t, 2, len(hook.Entries))
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusRegistered, st)
+}
+
+func TestContainer_ConfigureDisabled(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{ok: false}
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.NoError(t, c.Configure(&testCfg{`{"test":"something"}`}))
+ assert.Equal(t, 1, len(hook.Entries))
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusRegistered, st)
+}
+
+func TestContainer_ConfigureError(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{
+ ok: false,
+ cfgE: errors.New("configure error"),
+ }
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ err := c.Configure(&testCfg{`{"test":"something"}`})
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "configure error")
+ assert.Contains(t, err.Error(), "test.service")
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusRegistered, st)
+}
+
+func TestContainer_ConfigureTwice(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{ok: true}
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.NoError(t, c.Configure(&testCfg{`{"test":"something"}`}))
+ assert.Error(t, c.Configure(&testCfg{`{"test":"something"}`}))
+}
+
+func TestContainer_ServeEmptyContainer(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{ok: true}
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+
+ assert.NoError(t, c.Serve())
+ c.Stop()
+}
+
+func TestContainer_Serve(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{
+ ok: true,
+ waitForServe: make(chan interface{}),
+ }
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+ assert.NoError(t, c.Configure(&testCfg{`{"test":"something"}`}))
+
+ go func() {
+ assert.NoError(t, c.Serve())
+ }()
+
+ <-svc.waitForServe
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusServing, st)
+
+ c.Stop()
+
+ s, st = c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusStopped, st)
+}
+
+func TestContainer_ServeError(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{
+ ok: true,
+ waitForServe: make(chan interface{}),
+ serveE: errors.New("serve error"),
+ }
+
+ c := NewContainer(logger)
+ c.Register("test", svc)
+ assert.Equal(t, 1, len(hook.Entries))
+ assert.NoError(t, c.Configure(&testCfg{`{"test":"something"}`}))
+
+ err := c.Serve()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "serve error")
+ assert.Contains(t, err.Error(), "test.service")
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusStopped, st)
+}
+
+func TestContainer_ServeErrorMultiple(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ svc := &testService{
+ ok: true,
+ delay: time.Millisecond * 10,
+ waitForServe: make(chan interface{}),
+ serveE: errors.New("serve error"),
+ }
+
+ svc2 := &testService{
+ ok: true,
+ waitForServe: make(chan interface{}),
+ }
+
+ c := NewContainer(logger)
+ c.Register("test2", svc2)
+ c.Register("test", svc)
+ assert.Equal(t, 2, len(hook.Entries))
+ assert.NoError(t, c.Configure(&testCfg{`{"test":"something", "test2":"something-else"}`}))
+
+ err := c.Serve()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "serve error")
+ assert.Contains(t, err.Error(), "test.service")
+
+ s, st := c.Get("test")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusStopped, st)
+
+ s, st = c.Get("test2")
+ assert.IsType(t, &testService{}, s)
+ assert.Equal(t, StatusStopped, st)
+}