diff options
author | Wolfy-J <[email protected]> | 2018-06-07 17:32:13 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-07 17:32:13 +0300 |
commit | 1871706eee3aa9e6b534372fbf2817e317f25a92 (patch) | |
tree | b57380f49ef83785a621e6232e84b599d26d1bae | |
parent | e2ccf9717ca11bbcf8e9b8ee5332e3211d38cfa9 (diff) |
serving
-rw-r--r-- | .travis.yml | 2 | ||||
-rw-r--r-- | rpc/service.go | 4 | ||||
-rw-r--r-- | service/registry.go | 81 |
3 files changed, 70 insertions, 17 deletions
diff --git a/.travis.yml b/.travis.yml index f7108a84..5ea8b20d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,7 +20,7 @@ install: - go get -u "github.com/stretchr/testify/assert" script: - - go test ./... -race -v -coverprofile=coverage.txt -covermode=atomic + - go test -race -v -coverprofile=coverage.txt -covermode=atomic after_success: - bash <(curl -s https://codecov.io/bash) diff --git a/rpc/service.go b/rpc/service.go index b461bdc2..2dfb04b0 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -63,9 +63,9 @@ func (s *Service) Serve() error { } // Close stop Service Service. -func (s *Service) Stop() error { +func (s *Service) Stop() { + //todo: is started? close(s.stop) - return nil } // Register publishes in the server the set of methods of the diff --git a/service/registry.go b/service/registry.go index 0182fa93..424ad328 100644 --- a/service/registry.go +++ b/service/registry.go @@ -2,9 +2,9 @@ package service import ( "fmt" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "sync" + "github.com/pkg/errors" ) // Config provides ability to slice configuration sections and unmarshal configuration data into @@ -35,7 +35,7 @@ type Registry interface { Serve() error // Close all active services. - Stop() error + Stop() } // Service provides high level functionality for road runner Service. @@ -47,8 +47,8 @@ type Service interface { // Serve serves Service. Serve() error - // Close stop Service Service. - Stop() error + // Close setStopped Service Service. + Stop() } type registry struct { @@ -66,8 +66,33 @@ type entry struct { // Associated service instance Service Service - // Serving indicates that service is currently serving - Serving bool + // serving indicates that service is currently serving, todo: needs mutex + mu sync.Mutex + serving bool +} + +// serving returns true if service is serving. +func (e *entry) isServing() bool { + e.mu.Lock() + defer e.mu.Unlock() + + return e.serving +} + +// setStarted indicates that service is serving. +func (e *entry) setStarted() { + e.mu.Lock() + defer e.mu.Unlock() + + e.serving = true +} + +// setStopped indicates that service is being stopped. +func (e *entry) setStopped() { + e.mu.Lock() + defer e.mu.Unlock() + + e.serving = false } // NewRegistry creates new registry. @@ -86,7 +111,7 @@ func (r *registry) Register(name string, service Service) { r.candidates = append(r.candidates, &entry{ Name: name, Service: service, - Serving: false, + serving: false, }) r.log.Debugf("%s.service: registered", name) @@ -115,7 +140,7 @@ func (r *registry) Configure(cfg Config) error { r.configured = append(r.configured, &entry{ Name: e.Name, Service: s, - Serving: false, + serving: false, }) } } @@ -153,14 +178,42 @@ func (r *registry) Get(service string) Service { // Serve all configured services. Non blocking. func (r *registry) Serve() error { - // todo: serving + if len(r.configured) == 0 { + return errors.New("no services attached") + } - return nil -} + done := make(chan interface{}, len(r.configured)) + defer close(done) + + for _, s := range r.configured { + go func(s *entry) { + defer s.setStopped() + + s.setStarted() + if err := s.Service.Serve(); err != nil { + done <- err + } + }(s) + } + + for i := 0; i < len(r.configured); i++ { + result := <-done -// Close all active services. -func (r *registry) Stop() error { - // todo: stopping + // found an error in one of the services, stopping the rest of running services. + if err, ok := result.(error); ok { + r.Stop() + return err + } + } return nil } + +// Stop sends stop command to all running services. +func (r *registry) Stop() { + for _, s := range r.configured { + if s.isServing() { + s.Service.Stop() + } + } +} |