summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/rr/cmd/root.go2
-rw-r--r--rpc/service.go4
-rw-r--r--rpc/service_test.go12
-rw-r--r--service/container.go195
-rw-r--r--service/entry.go48
5 files changed, 131 insertions, 130 deletions
diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go
index d54437f0..c1e03160 100644
--- a/cmd/rr/cmd/root.go
+++ b/cmd/rr/cmd/root.go
@@ -38,7 +38,7 @@ var (
Logger = logrus.New()
// Services - shared service bus.
- Services = service.NewRegistry(Logger)
+ Services = service.NewContainer(Logger)
// CLI is application endpoint.
CLI = &cobra.Command{
diff --git a/rpc/service.go b/rpc/service.go
index 268087bc..516f341a 100644
--- a/rpc/service.go
+++ b/rpc/service.go
@@ -23,10 +23,6 @@ type Service struct {
// Configure must return configure service and return true if service is enabled. Must return error in case of
// misconfiguration.
func (s *Service) Configure(cfg service.Config, reg service.Container) (enabled bool, err error) {
- if s.cfg != nil {
- return true, errors.New("service is already configured")
- }
-
config := &config{}
if err := cfg.Unmarshal(config); err != nil {
return false, err
diff --git a/rpc/service_test.go b/rpc/service_test.go
index 13dd4930..a57ce1bd 100644
--- a/rpc/service_test.go
+++ b/rpc/service_test.go
@@ -50,18 +50,6 @@ func Test_Enabled(t *testing.T) {
assert.True(t, ok)
}
-func Test_TwoConfigurations(t *testing.T) {
- s := &Service{}
- ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
-
- assert.NoError(t, err)
- assert.True(t, ok)
-
- ok, err = s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
- assert.Error(t, err)
- assert.True(t, ok)
-}
-
func Test_StopNonServing(t *testing.T) {
s := &Service{}
ok, err := s.Configure(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
diff --git a/service/container.go b/service/container.go
index fb1d6874..89acc90b 100644
--- a/service/container.go
+++ b/service/container.go
@@ -19,17 +19,18 @@ type Config interface {
// Container controls all internal RR services and provides plugin based system.
type Container interface {
- // Register add new service to the registry under given name.
+ // Register add new service to the container under given name.
Register(name string, service Service)
// Reconfigure configures all underlying services with given configuration.
Configure(cfg Config) error
- // Check is Service has been registered and configured.
+ // Check if svc has been registered.
Has(service string) bool
- // Get returns Service instance by it's Name or nil if Service not found. Method must return only configured instance.
- Get(service string) Service
+ // Get returns svc instance by it's name or nil if svc not found. Method returns current service status
+ // as second value.
+ Get(service string) (svc Service, status int)
// Serve all configured services. Non blocking.
Serve() error
@@ -38,138 +39,96 @@ type Container interface {
Stop()
}
-// Service provides high level functionality for road runner Service.
+// svc provides high level functionality for road runner svc.
type Service interface {
- // Configure must return configure service and return true if service is enabled. Must return error in case of
+ // Configure must return configure service and return true if service hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
Configure(cfg Config, reg Container) (enabled bool, err error)
- // Serve serves Service.
+ // Serve serves svc.
Serve() error
- // Close setStopped Service Service.
+ // Close setStopped svc svc.
Stop()
}
-type registry struct {
- log logrus.FieldLogger
- mu sync.Mutex
- candidates []*entry
- configured []*entry
+type container struct {
+ log logrus.FieldLogger
+ mu sync.Mutex
+ services []*entry
}
-// entry creates association between service instance and given name.
-type entry struct {
- // Associated service name
- Name string
-
- // Associated service instance
- Service Service
-
- // serving indicates that service is currently serving, todo: needs mutex
- mu sync.Mutex
- serving bool
-}
-
-// serving returns true if service is serving.
-func (e *entry) isServing() bool {
- e.mu.Lock()
- defer e.mu.Unlock()
-
- return e.serving
-}
-
-// setStarted indicates that service is serving.
-func (e *entry) setStarted() {
- e.mu.Lock()
- defer e.mu.Unlock()
-
- e.serving = true
-}
-
-// setStopped indicates that service is being stopped.
-func (e *entry) setStopped() {
- e.mu.Lock()
- defer e.mu.Unlock()
-
- e.serving = false
-}
-
-// NewRegistry creates new registry.
-func NewRegistry(log logrus.FieldLogger) Container {
- return &registry{
- log: log,
- candidates: make([]*entry, 0),
+// NewContainer creates new service container.
+func NewContainer(log logrus.FieldLogger) Container {
+ return &container{
+ log: log,
+ services: make([]*entry, 0),
}
}
-// Register add new service to the registry under given name.
-func (r *registry) Register(name string, service Service) {
+// Register add new service to the container under given name.
+func (r *container) Register(name string, service Service) {
r.mu.Lock()
defer r.mu.Unlock()
- r.candidates = append(r.candidates, &entry{
- Name: name,
- Service: service,
- serving: false,
+ r.services = append(r.services, &entry{
+ name: name,
+ svc: service,
+ status: StatusConfigured,
})
r.log.Debugf("%s.service: registered", name)
}
-// Reconfigure configures all underlying services with given configuration.
-func (r *registry) Configure(cfg Config) error {
- if r.configured != nil {
- return fmt.Errorf("service bus has been already configured")
- }
-
- r.configured = make([]*entry, 0)
- for _, e := range r.candidates {
- segment := cfg.Get(e.Name)
- if segment == nil {
- r.log.Debugf("%s.service: no config has been provided", e.Name)
- continue
- }
+// Check hasStatus svc has been registered.
+func (r *container) Has(target string) bool {
+ r.mu.Lock()
+ defer r.mu.Unlock()
- _, err := e.Service.Configure(segment, r)
- if err != nil {
- return errors.Wrap(err, fmt.Sprintf("%s.service", e.Name))
+ for _, e := range r.services {
+ if e.name == target {
+ return true
}
-
- //if s != nil {
- // r.configured = append(r.configured, &entry{
- // Name: e.Name,
- // Service: s,
- // serving: false,
- // })
- //}
}
- return nil
+ return false
}
-// Check is Service has been registered.
-func (r *registry) Has(service string) bool {
+// Get returns svc instance by it's name or nil if svc not found.
+func (r *container) Get(target string) (svc Service, status int) {
r.mu.Lock()
defer r.mu.Unlock()
- for _, e := range r.configured {
- if e.Name == service {
- return true
+ for _, e := range r.services {
+ if e.name == target {
+ return e.svc, e.getStatus()
}
}
- return false
+ return nil, StatusUndefined
}
-// Get returns Service instance by it's Name or nil if Service not found.
-func (r *registry) Get(service string) Service {
+// Reconfigure configures all underlying services with given configuration.
+func (r *container) Configure(cfg Config) error {
r.mu.Lock()
defer r.mu.Unlock()
- for _, e := range r.configured {
- if e.Name == service {
- return e.Service
+ for _, e := range r.services {
+ if e.getStatus() >= StatusConfigured {
+ return fmt.Errorf("service %s has already been configured", e.name)
+ }
+
+ segment := cfg.Get(e.name)
+ if segment == nil {
+ r.log.Debugf("%s.service: no config has been provided", e.name)
+ continue
+ }
+
+ ok, err := e.svc.Configure(segment, r)
+ if err != nil {
+ return errors.Wrap(err, fmt.Sprintf("%s.service", e.name))
+ } else if ok {
+ e.setStatus(StatusConfigured)
}
}
@@ -177,26 +136,33 @@ func (r *registry) Get(service string) Service {
}
// Serve all configured services. Non blocking.
-func (r *registry) Serve() error {
- if len(r.configured) == 0 {
- return errors.New("no services attached")
- }
-
- done := make(chan interface{}, len(r.configured))
+func (r *container) Serve() error {
+ var (
+ numServing int
+ done = make(chan interface{}, len(r.services))
+ )
defer close(done)
- for _, s := range r.configured {
+ r.mu.Lock()
+ for _, e := range r.services {
+ if e.hasStatus(StatusConfigured) {
+ numServing ++
+ } else {
+ continue
+ }
+
go func(s *entry) {
- defer s.setStopped()
+ s.setStatus(StatusServing)
+ defer s.setStatus(StatusStopped)
- s.setStarted()
- if err := s.Service.Serve(); err != nil {
+ if err := s.svc.Serve(); err != nil {
done <- err
}
- }(s)
+ }(e)
}
+ r.mu.Unlock()
- for i := 0; i < len(r.configured); i++ {
+ for i := 0; i < numServing; i++ {
result := <-done
// found an error in one of the services, stopping the rest of running services.
@@ -210,10 +176,13 @@ func (r *registry) Serve() error {
}
// Stop sends stop command to all running services.
-func (r *registry) Stop() {
- for _, s := range r.configured {
- if s.isServing() {
- s.Service.Stop()
+func (r *container) Stop() {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ for _, e := range r.services {
+ if e.hasStatus(StatusServing) {
+ e.svc.Stop()
}
}
}
diff --git a/service/entry.go b/service/entry.go
new file mode 100644
index 00000000..cf44349c
--- /dev/null
+++ b/service/entry.go
@@ -0,0 +1,48 @@
+package service
+
+import "sync"
+
+const (
+ // StatusUndefined when service bus can not find the service.
+ StatusUndefined = iota
+
+ // StatusRegistered hasStatus setStatus when service has been registered in container.
+ StatusRegistered
+
+ // StatusConfigured hasStatus setStatus when service has been properly configured.
+ StatusConfigured
+
+ // StatusServing hasStatus setStatus when service hasStatus currently serving.
+ StatusServing
+
+ // StatusStopped hasStatus setStatus when service hasStatus stopped.
+ StatusStopped
+)
+
+// entry creates association between service instance and given name.
+type entry struct {
+ name string
+ svc Service
+ mu sync.Mutex
+ status int
+}
+
+// status returns service status
+func (e *entry) getStatus() int {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ return e.status
+}
+
+// setStarted indicates that service hasStatus status.
+func (e *entry) setStatus(status int) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ e.status = status
+}
+
+// hasStatus checks if entry in specific status
+func (e *entry) hasStatus(status int) bool {
+ return e.getStatus() == status
+}