summaryrefslogtreecommitdiff
path: root/cmd/_____/bus.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/_____/bus.go')
-rw-r--r--cmd/_____/bus.go94
1 files changed, 94 insertions, 0 deletions
diff --git a/cmd/_____/bus.go b/cmd/_____/bus.go
new file mode 100644
index 00000000..813a6c3b
--- /dev/null
+++ b/cmd/_____/bus.go
@@ -0,0 +1,94 @@
+package _____
+
+import (
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+ "net/rpc"
+ "sync"
+)
+
+// Config provides ability to slice configuration sections and unmarshal configuration data into
+// given structure.
+type Config interface {
+ // Get nested config section (sub-map), returns nil if section not found.
+ Get(service string) Config
+
+ // Unmarshal unmarshal config data into given struct.
+ Unmarshal(out interface{}) error
+}
+
+var (
+ dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
+)
+
+type Bus struct {
+ services []Service
+ wg sync.WaitGroup
+ enabled []Service
+ stop chan interface{}
+ rpc *rpc.Server
+ rpcConfig *RPCConfig
+}
+
+func (b *Bus) Register(s Service) {
+ b.services = append(b.services, s)
+}
+
+func (b *Bus) Services() []Service {
+ return b.services
+}
+
+func (b *Bus) Configure(cfg Config) error {
+ b.enabled = make([]Service, 0)
+
+ for _, s := range b.services {
+ segment := cfg.Get(s.Name())
+ if segment == nil {
+ // no config has been provided for the Service
+ logrus.Debugf("%s: no config has been provided", s.Name())
+ continue
+ }
+
+ if enable, err := s.Configure(segment); err != nil {
+ return err
+ } else if !enable {
+ continue
+ }
+
+ b.enabled = append(b.enabled, s)
+ }
+
+ return nil
+}
+
+func (b *Bus) Serve() {
+ b.rpc = rpc.NewServer()
+
+ for _, s := range b.enabled {
+ // some candidates might provide net/rpc api for internal communications
+ if api := s.RPC(); api != nil {
+ b.rpc.RegisterName(s.Name(), api)
+ }
+
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+
+ if err := s.Serve(); err != nil {
+ logrus.Errorf("%s.start: %s", s.Name(), err)
+ }
+ }()
+ }
+
+ b.wg.Wait()
+}
+
+func (b *Bus) Stop() {
+ for _, s := range b.enabled {
+ if err := s.Stop(); err != nil {
+ logrus.Errorf("%s.stop: %s", s.Name(), err)
+ }
+ }
+
+ b.wg.Wait()
+}