diff options
Diffstat (limited to 'service/bus.go')
-rw-r--r-- | service/bus.go | 165 |
1 files changed, 0 insertions, 165 deletions
diff --git a/service/bus.go b/service/bus.go deleted file mode 100644 index 8bfb914c..00000000 --- a/service/bus.go +++ /dev/null @@ -1,165 +0,0 @@ -package service - -import ( - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/spiral/goridge" - "net/rpc" - "sync" -) - -const ( - rpcConfig = "rpc" -) - -var ( - dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") -) - -type Bus struct { - wg sync.WaitGroup - services []Service - enabled []Service - stop chan interface{} - rpc *rpc.Server - rpcConfig *RPCConfig -} - -func (b *Bus) Register(s Service) { - b.services = append(b.services, s) -} - -func (b *Bus) Services() []Service { - return b.services -} - -func (b *Bus) Configure(cfg Config) error { - if segment := cfg.Get(rpcConfig); segment == nil { - logrus.Warn("rpc: no config has been provided") - } else { - b.rpcConfig = &RPCConfig{} - if err := segment.Unmarshal(b.rpcConfig); err != nil { - return err - } - } - - b.enabled = make([]Service, 0) - - for _, s := range b.services { - segment := cfg.Get(s.Name()) - if segment == nil { - // no config has been provided for the service - logrus.Debugf("%s: no config has been provided", s.Name()) - continue - } - - if enable, err := s.Configure(segment); err != nil { - return err - } else if !enable { - continue - } - - b.enabled = append(b.enabled, s) - } - - return nil -} - -func (b *Bus) RCPClient() (*rpc.Client, error) { - if b.rpcConfig == nil { - return nil, errors.New("rpc is not configured") - } - - conn, err := b.rpcConfig.CreateDialer() - if err != nil { - return nil, err - } - - return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil -} - -func (b *Bus) Serve() { - b.rpc = rpc.NewServer() - - for _, s := range b.enabled { - // some services might provide net/rpc api for internal communications - if api := s.RPC(); api != nil { - b.rpc.RegisterName(s.Name(), api) - } - - b.wg.Add(1) - go func() { - defer b.wg.Done() - - if err := s.Serve(); err != nil { - logrus.Errorf("%s.start: %s", s.Name(), err) - } - }() - } - - b.wg.Add(1) - go func() { - defer b.wg.Done() - - logrus.Debug("rpc: started") - if err := b.serveRPC(); err != nil { - logrus.Errorf("rpc: %s", err) - } - }() - - b.wg.Wait() -} - -func (b *Bus) Stop() { - if err := b.stopRPC(); err != nil { - logrus.Errorf("rpc: ", err) - } - - for _, s := range b.enabled { - if err := s.Stop(); err != nil { - logrus.Errorf("%s.stop: %s", s.Name(), err) - } - } - - b.wg.Wait() -} - -func (b *Bus) serveRPC() error { - if b.rpcConfig == nil { - return nil - } - - b.stop = make(chan interface{}) - - ln, err := b.rpcConfig.CreateListener() - if err != nil { - return err - } - defer ln.Close() - - for { - select { - case <-b.stop: - b.rpc = nil - return nil - default: - conn, err := ln.Accept() - if err != nil { - continue - } - - go b.rpc.ServeCodec(goridge.NewCodec(conn)) - } - } - - return nil -} - -func (b *Bus) stopRPC() error { - if b.rpcConfig == nil { - return nil - } - - close(b.stop) - return nil -} |