From 1871706eee3aa9e6b534372fbf2817e317f25a92 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Thu, 7 Jun 2018 17:32:13 +0300 Subject: serving --- service/registry.go | 81 ++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 14 deletions(-) (limited to 'service') diff --git a/service/registry.go b/service/registry.go index 0182fa93..424ad328 100644 --- a/service/registry.go +++ b/service/registry.go @@ -2,9 +2,9 @@ package service import ( "fmt" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "sync" + "github.com/pkg/errors" ) // Config provides ability to slice configuration sections and unmarshal configuration data into @@ -35,7 +35,7 @@ type Registry interface { Serve() error // Close all active services. - Stop() error + Stop() } // Service provides high level functionality for road runner Service. @@ -47,8 +47,8 @@ type Service interface { // Serve serves Service. Serve() error - // Close stop Service Service. - Stop() error + // Close setStopped Service Service. + Stop() } type registry struct { @@ -66,8 +66,33 @@ type entry struct { // Associated service instance Service Service - // Serving indicates that service is currently serving - Serving bool + // serving indicates that service is currently serving, todo: needs mutex + mu sync.Mutex + serving bool +} + +// serving returns true if service is serving. +func (e *entry) isServing() bool { + e.mu.Lock() + defer e.mu.Unlock() + + return e.serving +} + +// setStarted indicates that service is serving. +func (e *entry) setStarted() { + e.mu.Lock() + defer e.mu.Unlock() + + e.serving = true +} + +// setStopped indicates that service is being stopped. +func (e *entry) setStopped() { + e.mu.Lock() + defer e.mu.Unlock() + + e.serving = false } // NewRegistry creates new registry. @@ -86,7 +111,7 @@ func (r *registry) Register(name string, service Service) { r.candidates = append(r.candidates, &entry{ Name: name, Service: service, - Serving: false, + serving: false, }) r.log.Debugf("%s.service: registered", name) @@ -115,7 +140,7 @@ func (r *registry) Configure(cfg Config) error { r.configured = append(r.configured, &entry{ Name: e.Name, Service: s, - Serving: false, + serving: false, }) } } @@ -153,14 +178,42 @@ func (r *registry) Get(service string) Service { // Serve all configured services. Non blocking. func (r *registry) Serve() error { - // todo: serving + if len(r.configured) == 0 { + return errors.New("no services attached") + } - return nil -} + done := make(chan interface{}, len(r.configured)) + defer close(done) + + for _, s := range r.configured { + go func(s *entry) { + defer s.setStopped() + + s.setStarted() + if err := s.Service.Serve(); err != nil { + done <- err + } + }(s) + } + + for i := 0; i < len(r.configured); i++ { + result := <-done -// Close all active services. -func (r *registry) Stop() error { - // todo: stopping + // found an error in one of the services, stopping the rest of running services. + if err, ok := result.(error); ok { + r.Stop() + return err + } + } return nil } + +// Stop sends stop command to all running services. +func (r *registry) Stop() { + for _, s := range r.configured { + if s.isServing() { + s.Service.Stop() + } + } +} -- cgit v1.2.3