summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-10 13:36:59 +0300
committerWolfy-J <[email protected]>2018-06-10 13:36:59 +0300
commit549e357aca6e5bbfa0ff9a793f625f138445de2b (patch)
tree4734f0523d70640cfc7a85e83f7760f42d6e3f0e /service
parent16a347283b52f5cf997f9994cba79bba86a428c3 (diff)
merge candidates and configured
Diffstat (limited to 'service')
-rw-r--r--service/container.go195
-rw-r--r--service/entry.go48
2 files changed, 130 insertions, 113 deletions
diff --git a/service/container.go b/service/container.go
index fb1d6874..89acc90b 100644
--- a/service/container.go
+++ b/service/container.go
@@ -19,17 +19,18 @@ type Config interface {
// Container controls all internal RR services and provides plugin based system.
type Container interface {
- // Register add new service to the registry under given name.
+ // Register add new service to the container under given name.
Register(name string, service Service)
// Reconfigure configures all underlying services with given configuration.
Configure(cfg Config) error
- // Check is Service has been registered and configured.
+ // Check if svc has been registered.
Has(service string) bool
- // Get returns Service instance by it's Name or nil if Service not found. Method must return only configured instance.
- Get(service string) Service
+ // Get returns svc instance by it's name or nil if svc not found. Method returns current service status
+ // as second value.
+ Get(service string) (svc Service, status int)
// Serve all configured services. Non blocking.
Serve() error
@@ -38,138 +39,96 @@ type Container interface {
Stop()
}
-// Service provides high level functionality for road runner Service.
+// svc provides high level functionality for road runner svc.
type Service interface {
- // Configure must return configure service and return true if service is enabled. Must return error in case of
+ // Configure must return configure service and return true if service hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
Configure(cfg Config, reg Container) (enabled bool, err error)
- // Serve serves Service.
+ // Serve serves svc.
Serve() error
- // Close setStopped Service Service.
+ // Close setStopped svc svc.
Stop()
}
-type registry struct {
- log logrus.FieldLogger
- mu sync.Mutex
- candidates []*entry
- configured []*entry
+type container struct {
+ log logrus.FieldLogger
+ mu sync.Mutex
+ services []*entry
}
-// entry creates association between service instance and given name.
-type entry struct {
- // Associated service name
- Name string
-
- // Associated service instance
- Service Service
-
- // serving indicates that service is currently serving, todo: needs mutex
- mu sync.Mutex
- serving bool
-}
-
-// serving returns true if service is serving.
-func (e *entry) isServing() bool {
- e.mu.Lock()
- defer e.mu.Unlock()
-
- return e.serving
-}
-
-// setStarted indicates that service is serving.
-func (e *entry) setStarted() {
- e.mu.Lock()
- defer e.mu.Unlock()
-
- e.serving = true
-}
-
-// setStopped indicates that service is being stopped.
-func (e *entry) setStopped() {
- e.mu.Lock()
- defer e.mu.Unlock()
-
- e.serving = false
-}
-
-// NewRegistry creates new registry.
-func NewRegistry(log logrus.FieldLogger) Container {
- return &registry{
- log: log,
- candidates: make([]*entry, 0),
+// NewContainer creates new service container.
+func NewContainer(log logrus.FieldLogger) Container {
+ return &container{
+ log: log,
+ services: make([]*entry, 0),
}
}
-// Register add new service to the registry under given name.
-func (r *registry) Register(name string, service Service) {
+// Register add new service to the container under given name.
+func (r *container) Register(name string, service Service) {
r.mu.Lock()
defer r.mu.Unlock()
- r.candidates = append(r.candidates, &entry{
- Name: name,
- Service: service,
- serving: false,
+ r.services = append(r.services, &entry{
+ name: name,
+ svc: service,
+ status: StatusConfigured,
})
r.log.Debugf("%s.service: registered", name)
}
-// Reconfigure configures all underlying services with given configuration.
-func (r *registry) Configure(cfg Config) error {
- if r.configured != nil {
- return fmt.Errorf("service bus has been already configured")
- }
-
- r.configured = make([]*entry, 0)
- for _, e := range r.candidates {
- segment := cfg.Get(e.Name)
- if segment == nil {
- r.log.Debugf("%s.service: no config has been provided", e.Name)
- continue
- }
+// Check hasStatus svc has been registered.
+func (r *container) Has(target string) bool {
+ r.mu.Lock()
+ defer r.mu.Unlock()
- _, err := e.Service.Configure(segment, r)
- if err != nil {
- return errors.Wrap(err, fmt.Sprintf("%s.service", e.Name))
+ for _, e := range r.services {
+ if e.name == target {
+ return true
}
-
- //if s != nil {
- // r.configured = append(r.configured, &entry{
- // Name: e.Name,
- // Service: s,
- // serving: false,
- // })
- //}
}
- return nil
+ return false
}
-// Check is Service has been registered.
-func (r *registry) Has(service string) bool {
+// Get returns svc instance by it's name or nil if svc not found.
+func (r *container) Get(target string) (svc Service, status int) {
r.mu.Lock()
defer r.mu.Unlock()
- for _, e := range r.configured {
- if e.Name == service {
- return true
+ for _, e := range r.services {
+ if e.name == target {
+ return e.svc, e.getStatus()
}
}
- return false
+ return nil, StatusUndefined
}
-// Get returns Service instance by it's Name or nil if Service not found.
-func (r *registry) Get(service string) Service {
+// Reconfigure configures all underlying services with given configuration.
+func (r *container) Configure(cfg Config) error {
r.mu.Lock()
defer r.mu.Unlock()
- for _, e := range r.configured {
- if e.Name == service {
- return e.Service
+ for _, e := range r.services {
+ if e.getStatus() >= StatusConfigured {
+ return fmt.Errorf("service %s has already been configured", e.name)
+ }
+
+ segment := cfg.Get(e.name)
+ if segment == nil {
+ r.log.Debugf("%s.service: no config has been provided", e.name)
+ continue
+ }
+
+ ok, err := e.svc.Configure(segment, r)
+ if err != nil {
+ return errors.Wrap(err, fmt.Sprintf("%s.service", e.name))
+ } else if ok {
+ e.setStatus(StatusConfigured)
}
}
@@ -177,26 +136,33 @@ func (r *registry) Get(service string) Service {
}
// Serve all configured services. Non blocking.
-func (r *registry) Serve() error {
- if len(r.configured) == 0 {
- return errors.New("no services attached")
- }
-
- done := make(chan interface{}, len(r.configured))
+func (r *container) Serve() error {
+ var (
+ numServing int
+ done = make(chan interface{}, len(r.services))
+ )
defer close(done)
- for _, s := range r.configured {
+ r.mu.Lock()
+ for _, e := range r.services {
+ if e.hasStatus(StatusConfigured) {
+ numServing ++
+ } else {
+ continue
+ }
+
go func(s *entry) {
- defer s.setStopped()
+ s.setStatus(StatusServing)
+ defer s.setStatus(StatusStopped)
- s.setStarted()
- if err := s.Service.Serve(); err != nil {
+ if err := s.svc.Serve(); err != nil {
done <- err
}
- }(s)
+ }(e)
}
+ r.mu.Unlock()
- for i := 0; i < len(r.configured); i++ {
+ for i := 0; i < numServing; i++ {
result := <-done
// found an error in one of the services, stopping the rest of running services.
@@ -210,10 +176,13 @@ func (r *registry) Serve() error {
}
// Stop sends stop command to all running services.
-func (r *registry) Stop() {
- for _, s := range r.configured {
- if s.isServing() {
- s.Service.Stop()
+func (r *container) Stop() {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ for _, e := range r.services {
+ if e.hasStatus(StatusServing) {
+ e.svc.Stop()
}
}
}
diff --git a/service/entry.go b/service/entry.go
new file mode 100644
index 00000000..cf44349c
--- /dev/null
+++ b/service/entry.go
@@ -0,0 +1,48 @@
+package service
+
+import "sync"
+
+const (
+ // StatusUndefined when service bus can not find the service.
+ StatusUndefined = iota
+
+ // StatusRegistered hasStatus setStatus when service has been registered in container.
+ StatusRegistered
+
+ // StatusConfigured hasStatus setStatus when service has been properly configured.
+ StatusConfigured
+
+ // StatusServing hasStatus setStatus when service hasStatus currently serving.
+ StatusServing
+
+ // StatusStopped hasStatus setStatus when service hasStatus stopped.
+ StatusStopped
+)
+
+// entry creates association between service instance and given name.
+type entry struct {
+ name string
+ svc Service
+ mu sync.Mutex
+ status int
+}
+
+// status returns service status
+func (e *entry) getStatus() int {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ return e.status
+}
+
+// setStarted indicates that service hasStatus status.
+func (e *entry) setStatus(status int) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ e.status = status
+}
+
+// hasStatus checks if entry in specific status
+func (e *entry) hasStatus(status int) bool {
+ return e.getStatus() == status
+}