diff options
Diffstat (limited to 'cmd/_____/bus.go')
-rw-r--r-- | cmd/_____/bus.go | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/cmd/_____/bus.go b/cmd/_____/bus.go new file mode 100644 index 00000000..813a6c3b --- /dev/null +++ b/cmd/_____/bus.go @@ -0,0 +1,94 @@ +package _____ + +import ( + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "net/rpc" + "sync" +) + +// Config provides ability to slice configuration sections and unmarshal configuration data into +// given structure. +type Config interface { + // Get nested config section (sub-map), returns nil if section not found. + Get(service string) Config + + // Unmarshal unmarshal config data into given struct. + Unmarshal(out interface{}) error +} + +var ( + dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") +) + +type Bus struct { + services []Service + wg sync.WaitGroup + enabled []Service + stop chan interface{} + rpc *rpc.Server + rpcConfig *RPCConfig +} + +func (b *Bus) Register(s Service) { + b.services = append(b.services, s) +} + +func (b *Bus) Services() []Service { + return b.services +} + +func (b *Bus) Configure(cfg Config) error { + b.enabled = make([]Service, 0) + + for _, s := range b.services { + segment := cfg.Get(s.Name()) + if segment == nil { + // no config has been provided for the Service + logrus.Debugf("%s: no config has been provided", s.Name()) + continue + } + + if enable, err := s.Configure(segment); err != nil { + return err + } else if !enable { + continue + } + + b.enabled = append(b.enabled, s) + } + + return nil +} + +func (b *Bus) Serve() { + b.rpc = rpc.NewServer() + + for _, s := range b.enabled { + // some candidates might provide net/rpc api for internal communications + if api := s.RPC(); api != nil { + b.rpc.RegisterName(s.Name(), api) + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + + if err := s.Serve(); err != nil { + logrus.Errorf("%s.start: %s", s.Name(), err) + } + }() + } + + b.wg.Wait() +} + +func (b *Bus) Stop() { + for _, s := range b.enabled { + if err := s.Stop(); err != nil { + logrus.Errorf("%s.stop: %s", s.Name(), err) + } + } + + b.wg.Wait() +} |