diff options
author | Wolfy-J <[email protected]> | 2020-10-28 13:03:32 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2020-10-28 13:03:32 +0300 |
commit | 126026f11f9b0108d80a3eb46097aabf9b31aa05 (patch) | |
tree | 61eb68871b69638bc523c5b7f910d6999bc4539e /plugins | |
parent | 2d3349eee632e7357ed1eb6905444194a28a4ec0 (diff) |
- added RPC logging
Diffstat (limited to 'plugins')
-rwxr-xr-x | plugins/rpc/config.go | 6 | ||||
-rwxr-xr-x | plugins/rpc/rpc.go | 67 |
2 files changed, 39 insertions, 34 deletions
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go index 719fd5e3..1a599695 100755 --- a/plugins/rpc/config.go +++ b/plugins/rpc/config.go @@ -8,7 +8,7 @@ import ( "github.com/spiral/roadrunner/v2/util" ) -// Config defines RPC service config. +// Config defines RPC service cfg. type Config struct { // Listen string Listen string @@ -17,14 +17,14 @@ type Config struct { Disabled bool } -// InitDefaults allows to init blank config with pre-defined set of default values. +// InitDefaults allows to init blank cfg with pre-defined set of default values. func (c *Config) InitDefaults() { if c.Listen == "" { c.Listen = "tcp://127.0.0.1:6001" } } -// Valid returns nil if config is valid. +// Valid returns nil if cfg is valid. func (c *Config) Valid() error { if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { return errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go index 0f6c9753..894c89d8 100755 --- a/plugins/rpc/rpc.go +++ b/plugins/rpc/rpc.go @@ -1,6 +1,7 @@ package rpc import ( + "go.uber.org/zap" "net/rpc" "github.com/spiral/endure" @@ -20,65 +21,70 @@ type RPCPluggable interface { // ServiceName contains default service name. const ServiceName = "rpc" -type services struct { - service interface{} - name string -} - // Service is RPC service. type Service struct { + cfg Config + log *zap.Logger rpc *rpc.Server - services []services - config Config + services []RPCPluggable close chan struct{} } // Init rpc service. Must return true if service is enabled. -func (s *Service) Init(cfg config.Provider) error { +func (s *Service) Init(cfg config.Provider, log *zap.Logger) error { if !cfg.Has(ServiceName) { return errors.E(errors.Disabled) } - err := cfg.UnmarshalKey(ServiceName, &s.config) + err := cfg.UnmarshalKey(ServiceName, &s.cfg) if err != nil { return err } - s.config.InitDefaults() + s.cfg.InitDefaults() - if s.config.Disabled { + if s.cfg.Disabled { return errors.E(errors.Disabled) } - return s.config.Valid() -} + s.log = log -// Name contains service name. -func (s *Service) Name() string { - return ServiceName + return s.cfg.Valid() } // Serve serves the service. func (s *Service) Serve() chan error { - s.close = make(chan struct{}, 1) errCh := make(chan error, 1) + s.close = make(chan struct{}, 1) s.rpc = rpc.NewServer() + names := make([]string, 0, len(s.services)) + // Attach all services for i := 0; i < len(s.services); i++ { - err := s.Register(s.services[i].name, s.services[i].service) + svc, err := s.services[i].RPCService() if err != nil { errCh <- errors.E(errors.Op("register service"), err) return errCh } + + err = s.Register(s.services[i].Name(), svc) + if err != nil { + errCh <- errors.E(errors.Op("register service"), err) + return errCh + } + + names = append(names, s.services[i].Name()) } - ln, err := s.config.Listener() + ln, err := s.cfg.Listener() if err != nil { errCh <- err return errCh } + s.log.Debug("Started RPC service", zap.String("address", s.cfg.Listen), zap.Any("services", names)) + go func() { for { select { @@ -109,22 +115,21 @@ func (s *Service) Stop() error { return nil } +// Name contains service name. +func (s *Service) Name() string { + return ServiceName +} + +// Depends declares services to collect for RPC. func (s *Service) Depends() []interface{} { return []interface{}{ - s.RegisterService, + s.RegisterPlugin, } } -func (s *Service) RegisterService(p RPCPluggable) error { - service, err := p.RPCService() - if err != nil { - return err - } - - s.services = append(s.services, services{ - service: service, - name: p.Name(), - }) +// RegisterPlugin registers RPC service plugin. +func (s *Service) RegisterPlugin(p RPCPluggable) error { + s.services = append(s.services, p) return nil } @@ -146,7 +151,7 @@ func (s *Service) Register(name string, svc interface{}) error { // Client creates new RPC client. func (s *Service) Client() (*rpc.Client, error) { - conn, err := s.config.Dialer() + conn, err := s.cfg.Dialer() if err != nil { return nil, err } |