diff options
author | Valery Piashchynski <[email protected]> | 2020-10-26 12:01:53 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-10-26 12:01:53 +0300 |
commit | 91cf918b30938129609323ded53e190385e019a6 (patch) | |
tree | 0ad9537bd438c63719fb83343ab77fc4ab34eb83 /plugins/rpc/rpc.go | |
parent | 68bf13772c6ddfc5159c2a286e1a38e911614e72 (diff) | |
parent | 9aae9e2009bad07ebdee73e1c6cf56901d07880a (diff) |
Merge pull request #373 from spiral/feature/new-worker-produces-active-worker
Feature/new worker produces active worker
Diffstat (limited to 'plugins/rpc/rpc.go')
-rwxr-xr-x | plugins/rpc/rpc.go | 82 |
1 files changed, 40 insertions, 42 deletions
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go index 6568eea3..0f6c9753 100755 --- a/plugins/rpc/rpc.go +++ b/plugins/rpc/rpc.go @@ -1,21 +1,24 @@ package rpc import ( - "errors" + "net/rpc" + "github.com/spiral/endure" + "github.com/spiral/endure/errors" "github.com/spiral/goridge/v2" "github.com/spiral/roadrunner/v2/plugins/config" - - "net/rpc" ) -type PluginRpc interface { - Name() string - RpcService() (interface{}, error) +// RPCPluggable declares the ability to create set of public RPC methods. +type RPCPluggable interface { + endure.Named + + // Provides RPC methods for the given service. + RPCService() (interface{}, error) } -// ID contains default service name. -const ID = "rpc" +// ServiceName contains default service name. +const ServiceName = "rpc" type services struct { service interface{} @@ -24,52 +27,48 @@ type services struct { // Service is RPC service. type Service struct { - // TODO do we need a pointer here since all receivers are pointers?? - rpc *rpc.Server - configProvider config.Provider - services []services - config Config - close chan struct{} + rpc *rpc.Server + services []services + config Config + close chan struct{} } // Init rpc service. Must return true if service is enabled. func (s *Service) Init(cfg config.Provider) error { - s.configProvider = cfg - err := s.configProvider.UnmarshalKey(ID, &s.config) + if !cfg.Has(ServiceName) { + return errors.E(errors.Disabled) + } + + err := cfg.UnmarshalKey(ServiceName, &s.config) if err != nil { return err } + s.config.InitDefaults() - // TODO Do we need to init defaults - if s.config.Listen == "" { - s.config.InitDefaults() + if s.config.Disabled { + return errors.E(errors.Disabled) } - s.close = make(chan struct{}) + return s.config.Valid() +} - return nil +// Name contains service name. +func (s *Service) Name() string { + return ServiceName } // Serve serves the service. func (s *Service) Serve() chan error { + s.close = make(chan struct{}, 1) errCh := make(chan error, 1) - server := rpc.NewServer() - if server == nil { - errCh <- errors.New("rpc server is nil") - return errCh - } - s.rpc = server - if len(s.services) == 0 { - errCh <- errors.New("no services with RPC") - return errCh - } + s.rpc = rpc.NewServer() // Attach all services for i := 0; i < len(s.services); i++ { err := s.Register(s.services[i].name, s.services[i].service) if err != nil { - errCh <- err + errCh <- errors.E(errors.Op("register service"), err) return errCh } } @@ -85,7 +84,10 @@ func (s *Service) Serve() chan error { select { case <-s.close: // log error - errCh <- ln.Close() + err := ln.Close() + if err != nil { + errCh <- errors.E(errors.Op("close RPC socket"), err) + } return default: conn, err := ln.Accept() @@ -98,7 +100,7 @@ func (s *Service) Serve() chan error { } }() - return nil + return errCh } // Stop stops the service. @@ -109,12 +111,12 @@ func (s *Service) Stop() error { func (s *Service) Depends() []interface{} { return []interface{}{ - s.RpcService, + s.RegisterService, } } -func (s *Service) RpcService(p PluginRpc) error { - service, err := p.RpcService() +func (s *Service) RegisterService(p RPCPluggable) error { + service, err := p.RPCService() if err != nil { return err } @@ -136,7 +138,7 @@ func (s *Service) RpcService(p PluginRpc) error { // no suitable methods. It also logs the error using package log. func (s *Service) Register(name string, svc interface{}) error { if s.rpc == nil { - return errors.New("RPC service is not configured") + return errors.E("RPC service is not configured") } return s.rpc.RegisterName(name, svc) @@ -144,10 +146,6 @@ func (s *Service) Register(name string, svc interface{}) error { // Client creates new RPC client. func (s *Service) Client() (*rpc.Client, error) { - if s.configProvider == nil { - return nil, errors.New("RPC service is not configured") - } - conn, err := s.config.Dialer() if err != nil { return nil, err |