diff options
author | Wolfy-J <[email protected]> | 2018-06-10 13:36:59 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-10 13:36:59 +0300 |
commit | 549e357aca6e5bbfa0ff9a793f625f138445de2b (patch) | |
tree | 4734f0523d70640cfc7a85e83f7760f42d6e3f0e /service/container.go | |
parent | 16a347283b52f5cf997f9994cba79bba86a428c3 (diff) |
merge candidates and configured
Diffstat (limited to 'service/container.go')
-rw-r--r-- | service/container.go | 195 |
1 files changed, 82 insertions, 113 deletions
diff --git a/service/container.go b/service/container.go index fb1d6874..89acc90b 100644 --- a/service/container.go +++ b/service/container.go @@ -19,17 +19,18 @@ type Config interface { // Container controls all internal RR services and provides plugin based system. type Container interface { - // Register add new service to the registry under given name. + // Register add new service to the container under given name. Register(name string, service Service) // Reconfigure configures all underlying services with given configuration. Configure(cfg Config) error - // Check is Service has been registered and configured. + // Check if svc has been registered. Has(service string) bool - // Get returns Service instance by it's Name or nil if Service not found. Method must return only configured instance. - Get(service string) Service + // Get returns svc instance by it's name or nil if svc not found. Method returns current service status + // as second value. + Get(service string) (svc Service, status int) // Serve all configured services. Non blocking. Serve() error @@ -38,138 +39,96 @@ type Container interface { Stop() } -// Service provides high level functionality for road runner Service. +// svc provides high level functionality for road runner svc. type Service interface { - // Configure must return configure service and return true if service is enabled. Must return error in case of + // Configure must return configure service and return true if service hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. Configure(cfg Config, reg Container) (enabled bool, err error) - // Serve serves Service. + // Serve serves svc. Serve() error - // Close setStopped Service Service. + // Close setStopped svc svc. Stop() } -type registry struct { - log logrus.FieldLogger - mu sync.Mutex - candidates []*entry - configured []*entry +type container struct { + log logrus.FieldLogger + mu sync.Mutex + services []*entry } -// entry creates association between service instance and given name. -type entry struct { - // Associated service name - Name string - - // Associated service instance - Service Service - - // serving indicates that service is currently serving, todo: needs mutex - mu sync.Mutex - serving bool -} - -// serving returns true if service is serving. -func (e *entry) isServing() bool { - e.mu.Lock() - defer e.mu.Unlock() - - return e.serving -} - -// setStarted indicates that service is serving. -func (e *entry) setStarted() { - e.mu.Lock() - defer e.mu.Unlock() - - e.serving = true -} - -// setStopped indicates that service is being stopped. -func (e *entry) setStopped() { - e.mu.Lock() - defer e.mu.Unlock() - - e.serving = false -} - -// NewRegistry creates new registry. -func NewRegistry(log logrus.FieldLogger) Container { - return ®istry{ - log: log, - candidates: make([]*entry, 0), +// NewContainer creates new service container. +func NewContainer(log logrus.FieldLogger) Container { + return &container{ + log: log, + services: make([]*entry, 0), } } -// Register add new service to the registry under given name. -func (r *registry) Register(name string, service Service) { +// Register add new service to the container under given name. +func (r *container) Register(name string, service Service) { r.mu.Lock() defer r.mu.Unlock() - r.candidates = append(r.candidates, &entry{ - Name: name, - Service: service, - serving: false, + r.services = append(r.services, &entry{ + name: name, + svc: service, + status: StatusConfigured, }) r.log.Debugf("%s.service: registered", name) } -// Reconfigure configures all underlying services with given configuration. -func (r *registry) Configure(cfg Config) error { - if r.configured != nil { - return fmt.Errorf("service bus has been already configured") - } - - r.configured = make([]*entry, 0) - for _, e := range r.candidates { - segment := cfg.Get(e.Name) - if segment == nil { - r.log.Debugf("%s.service: no config has been provided", e.Name) - continue - } +// Check hasStatus svc has been registered. +func (r *container) Has(target string) bool { + r.mu.Lock() + defer r.mu.Unlock() - _, err := e.Service.Configure(segment, r) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("%s.service", e.Name)) + for _, e := range r.services { + if e.name == target { + return true } - - //if s != nil { - // r.configured = append(r.configured, &entry{ - // Name: e.Name, - // Service: s, - // serving: false, - // }) - //} } - return nil + return false } -// Check is Service has been registered. -func (r *registry) Has(service string) bool { +// Get returns svc instance by it's name or nil if svc not found. +func (r *container) Get(target string) (svc Service, status int) { r.mu.Lock() defer r.mu.Unlock() - for _, e := range r.configured { - if e.Name == service { - return true + for _, e := range r.services { + if e.name == target { + return e.svc, e.getStatus() } } - return false + return nil, StatusUndefined } -// Get returns Service instance by it's Name or nil if Service not found. -func (r *registry) Get(service string) Service { +// Reconfigure configures all underlying services with given configuration. +func (r *container) Configure(cfg Config) error { r.mu.Lock() defer r.mu.Unlock() - for _, e := range r.configured { - if e.Name == service { - return e.Service + for _, e := range r.services { + if e.getStatus() >= StatusConfigured { + return fmt.Errorf("service %s has already been configured", e.name) + } + + segment := cfg.Get(e.name) + if segment == nil { + r.log.Debugf("%s.service: no config has been provided", e.name) + continue + } + + ok, err := e.svc.Configure(segment, r) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("%s.service", e.name)) + } else if ok { + e.setStatus(StatusConfigured) } } @@ -177,26 +136,33 @@ func (r *registry) Get(service string) Service { } // Serve all configured services. Non blocking. -func (r *registry) Serve() error { - if len(r.configured) == 0 { - return errors.New("no services attached") - } - - done := make(chan interface{}, len(r.configured)) +func (r *container) Serve() error { + var ( + numServing int + done = make(chan interface{}, len(r.services)) + ) defer close(done) - for _, s := range r.configured { + r.mu.Lock() + for _, e := range r.services { + if e.hasStatus(StatusConfigured) { + numServing ++ + } else { + continue + } + go func(s *entry) { - defer s.setStopped() + s.setStatus(StatusServing) + defer s.setStatus(StatusStopped) - s.setStarted() - if err := s.Service.Serve(); err != nil { + if err := s.svc.Serve(); err != nil { done <- err } - }(s) + }(e) } + r.mu.Unlock() - for i := 0; i < len(r.configured); i++ { + for i := 0; i < numServing; i++ { result := <-done // found an error in one of the services, stopping the rest of running services. @@ -210,10 +176,13 @@ func (r *registry) Serve() error { } // Stop sends stop command to all running services. -func (r *registry) Stop() { - for _, s := range r.configured { - if s.isServing() { - s.Service.Stop() +func (r *container) Stop() { + r.mu.Lock() + defer r.mu.Unlock() + + for _, e := range r.services { + if e.hasStatus(StatusServing) { + e.svc.Stop() } } } |