summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
Diffstat (limited to 'service')
-rw-r--r--service/registry.go81
1 files changed, 67 insertions, 14 deletions
diff --git a/service/registry.go b/service/registry.go
index 0182fa93..424ad328 100644
--- a/service/registry.go
+++ b/service/registry.go
@@ -2,9 +2,9 @@ package service
import (
"fmt"
- "github.com/pkg/errors"
"github.com/sirupsen/logrus"
"sync"
+ "github.com/pkg/errors"
)
// Config provides ability to slice configuration sections and unmarshal configuration data into
@@ -35,7 +35,7 @@ type Registry interface {
Serve() error
// Close all active services.
- Stop() error
+ Stop()
}
// Service provides high level functionality for road runner Service.
@@ -47,8 +47,8 @@ type Service interface {
// Serve serves Service.
Serve() error
- // Close stop Service Service.
- Stop() error
+ // Close setStopped Service Service.
+ Stop()
}
type registry struct {
@@ -66,8 +66,33 @@ type entry struct {
// Associated service instance
Service Service
- // Serving indicates that service is currently serving
- Serving bool
+ // serving indicates that service is currently serving, todo: needs mutex
+ mu sync.Mutex
+ serving bool
+}
+
+// serving returns true if service is serving.
+func (e *entry) isServing() bool {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ return e.serving
+}
+
+// setStarted indicates that service is serving.
+func (e *entry) setStarted() {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ e.serving = true
+}
+
+// setStopped indicates that service is being stopped.
+func (e *entry) setStopped() {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ e.serving = false
}
// NewRegistry creates new registry.
@@ -86,7 +111,7 @@ func (r *registry) Register(name string, service Service) {
r.candidates = append(r.candidates, &entry{
Name: name,
Service: service,
- Serving: false,
+ serving: false,
})
r.log.Debugf("%s.service: registered", name)
@@ -115,7 +140,7 @@ func (r *registry) Configure(cfg Config) error {
r.configured = append(r.configured, &entry{
Name: e.Name,
Service: s,
- Serving: false,
+ serving: false,
})
}
}
@@ -153,14 +178,42 @@ func (r *registry) Get(service string) Service {
// Serve all configured services. Non blocking.
func (r *registry) Serve() error {
- // todo: serving
+ if len(r.configured) == 0 {
+ return errors.New("no services attached")
+ }
- return nil
-}
+ done := make(chan interface{}, len(r.configured))
+ defer close(done)
+
+ for _, s := range r.configured {
+ go func(s *entry) {
+ defer s.setStopped()
+
+ s.setStarted()
+ if err := s.Service.Serve(); err != nil {
+ done <- err
+ }
+ }(s)
+ }
+
+ for i := 0; i < len(r.configured); i++ {
+ result := <-done
-// Close all active services.
-func (r *registry) Stop() error {
- // todo: stopping
+ // found an error in one of the services, stopping the rest of running services.
+ if err, ok := result.(error); ok {
+ r.Stop()
+ return err
+ }
+ }
return nil
}
+
+// Stop sends stop command to all running services.
+func (r *registry) Stop() {
+ for _, s := range r.configured {
+ if s.isServing() {
+ s.Service.Stop()
+ }
+ }
+}