summaryrefslogtreecommitdiff
path: root/service/bus.go
diff options
context:
space:
mode:
Diffstat (limited to 'service/bus.go')
-rw-r--r--service/bus.go165
1 files changed, 165 insertions, 0 deletions
diff --git a/service/bus.go b/service/bus.go
new file mode 100644
index 00000000..40e73c31
--- /dev/null
+++ b/service/bus.go
@@ -0,0 +1,165 @@
+package service
+
+import (
+ "github.com/sirupsen/logrus"
+ "net/rpc"
+ "sync"
+ "github.com/spiral/goridge"
+ "github.com/pkg/errors"
+)
+
+const (
+ rpcConfig = "rpc"
+)
+
+var (
+ dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
+)
+
+type Bus struct {
+ wg sync.WaitGroup
+ services []Service
+ enabled []Service
+ stop chan interface{}
+ rpc *rpc.Server
+ rpcConfig *RPCConfig
+}
+
+func (b *Bus) Register(s Service) {
+ b.services = append(b.services, s)
+}
+
+func (b *Bus) Services() []Service {
+ return b.services
+}
+
+func (b *Bus) Configure(cfg Config) error {
+ if segment := cfg.Get(rpcConfig); segment == nil {
+ logrus.Warn("rpc: no config has been provided")
+ } else {
+ b.rpcConfig = &RPCConfig{}
+ if err := segment.Unmarshal(b.rpcConfig); err != nil {
+ return err
+ }
+ }
+
+ b.enabled = make([]Service, 0)
+
+ for _, s := range b.services {
+ segment := cfg.Get(s.Name())
+ if segment == nil {
+ // no config has been provided for the service
+ logrus.Debugf("%s: no config has been provided", s.Name())
+ continue
+ }
+
+ if enable, err := s.Configure(segment); err != nil {
+ return err
+ } else if !enable {
+ continue
+ }
+
+ b.enabled = append(b.enabled, s)
+ }
+
+ return nil
+}
+
+func (b *Bus) RCPClient() (*rpc.Client, error) {
+ if b.rpcConfig == nil {
+ return nil, errors.New("rpc is not configured")
+ }
+
+ conn, err := b.rpcConfig.CreateDialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}
+
+func (b *Bus) Serve() {
+ b.rpc = rpc.NewServer()
+
+ for _, s := range b.enabled {
+ // some services might provide net/rpc api for internal communications
+ if api := s.RPC(); api != nil {
+ b.rpc.RegisterName(s.Name(), api)
+ }
+
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+
+ if err := s.Serve(); err != nil {
+ logrus.Errorf("%s.start: ", s.Name(), err)
+ }
+ }()
+ }
+
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+
+ logrus.Debug("rpc: started")
+ if err := b.serveRPC(); err != nil {
+ logrus.Errorf("rpc: %s", err)
+ }
+ }()
+
+ b.wg.Wait()
+}
+
+func (b *Bus) Stop() {
+ if err := b.stopRPC(); err != nil {
+ logrus.Errorf("rpc: ", err)
+ }
+
+ for _, s := range b.enabled {
+ if err := s.Stop(); err != nil {
+ logrus.Errorf("%s.stop: ", s.Name(), err)
+ }
+ }
+
+ b.wg.Wait()
+}
+
+func (b *Bus) serveRPC() error {
+ if b.rpcConfig == nil {
+ return nil
+ }
+
+ b.stop = make(chan interface{})
+
+ ln, err := b.rpcConfig.CreateListener()
+ if err != nil {
+ return err
+ }
+ defer ln.Close()
+
+ for {
+ select {
+ case <-b.stop:
+ b.rpc = nil
+ return nil
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ go b.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+
+ return nil
+}
+
+func (b *Bus) stopRPC() error {
+ if b.rpcConfig == nil {
+ return nil
+ }
+
+ close(b.stop)
+ return nil
+}