summaryrefslogtreecommitdiff
path: root/plugins/rpc/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-26 12:01:53 +0300
committerGitHub <[email protected]>2020-10-26 12:01:53 +0300
commit91cf918b30938129609323ded53e190385e019a6 (patch)
tree0ad9537bd438c63719fb83343ab77fc4ab34eb83 /plugins/rpc/rpc.go
parent68bf13772c6ddfc5159c2a286e1a38e911614e72 (diff)
parent9aae9e2009bad07ebdee73e1c6cf56901d07880a (diff)
Merge pull request #373 from spiral/feature/new-worker-produces-active-worker
Feature/new worker produces active worker
Diffstat (limited to 'plugins/rpc/rpc.go')
-rwxr-xr-xplugins/rpc/rpc.go82
1 files changed, 40 insertions, 42 deletions
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go
index 6568eea3..0f6c9753 100755
--- a/plugins/rpc/rpc.go
+++ b/plugins/rpc/rpc.go
@@ -1,21 +1,24 @@
package rpc
import (
- "errors"
+ "net/rpc"
+ "github.com/spiral/endure"
+ "github.com/spiral/endure/errors"
"github.com/spiral/goridge/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
-
- "net/rpc"
)
-type PluginRpc interface {
- Name() string
- RpcService() (interface{}, error)
+// RPCPluggable declares the ability to create set of public RPC methods.
+type RPCPluggable interface {
+ endure.Named
+
+ // Provides RPC methods for the given service.
+ RPCService() (interface{}, error)
}
-// ID contains default service name.
-const ID = "rpc"
+// ServiceName contains default service name.
+const ServiceName = "rpc"
type services struct {
service interface{}
@@ -24,52 +27,48 @@ type services struct {
// Service is RPC service.
type Service struct {
- // TODO do we need a pointer here since all receivers are pointers??
- rpc *rpc.Server
- configProvider config.Provider
- services []services
- config Config
- close chan struct{}
+ rpc *rpc.Server
+ services []services
+ config Config
+ close chan struct{}
}
// Init rpc service. Must return true if service is enabled.
func (s *Service) Init(cfg config.Provider) error {
- s.configProvider = cfg
- err := s.configProvider.UnmarshalKey(ID, &s.config)
+ if !cfg.Has(ServiceName) {
+ return errors.E(errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(ServiceName, &s.config)
if err != nil {
return err
}
+ s.config.InitDefaults()
- // TODO Do we need to init defaults
- if s.config.Listen == "" {
- s.config.InitDefaults()
+ if s.config.Disabled {
+ return errors.E(errors.Disabled)
}
- s.close = make(chan struct{})
+ return s.config.Valid()
+}
- return nil
+// Name contains service name.
+func (s *Service) Name() string {
+ return ServiceName
}
// Serve serves the service.
func (s *Service) Serve() chan error {
+ s.close = make(chan struct{}, 1)
errCh := make(chan error, 1)
- server := rpc.NewServer()
- if server == nil {
- errCh <- errors.New("rpc server is nil")
- return errCh
- }
- s.rpc = server
- if len(s.services) == 0 {
- errCh <- errors.New("no services with RPC")
- return errCh
- }
+ s.rpc = rpc.NewServer()
// Attach all services
for i := 0; i < len(s.services); i++ {
err := s.Register(s.services[i].name, s.services[i].service)
if err != nil {
- errCh <- err
+ errCh <- errors.E(errors.Op("register service"), err)
return errCh
}
}
@@ -85,7 +84,10 @@ func (s *Service) Serve() chan error {
select {
case <-s.close:
// log error
- errCh <- ln.Close()
+ err := ln.Close()
+ if err != nil {
+ errCh <- errors.E(errors.Op("close RPC socket"), err)
+ }
return
default:
conn, err := ln.Accept()
@@ -98,7 +100,7 @@ func (s *Service) Serve() chan error {
}
}()
- return nil
+ return errCh
}
// Stop stops the service.
@@ -109,12 +111,12 @@ func (s *Service) Stop() error {
func (s *Service) Depends() []interface{} {
return []interface{}{
- s.RpcService,
+ s.RegisterService,
}
}
-func (s *Service) RpcService(p PluginRpc) error {
- service, err := p.RpcService()
+func (s *Service) RegisterService(p RPCPluggable) error {
+ service, err := p.RPCService()
if err != nil {
return err
}
@@ -136,7 +138,7 @@ func (s *Service) RpcService(p PluginRpc) error {
// no suitable methods. It also logs the error using package log.
func (s *Service) Register(name string, svc interface{}) error {
if s.rpc == nil {
- return errors.New("RPC service is not configured")
+ return errors.E("RPC service is not configured")
}
return s.rpc.RegisterName(name, svc)
@@ -144,10 +146,6 @@ func (s *Service) Register(name string, svc interface{}) error {
// Client creates new RPC client.
func (s *Service) Client() (*rpc.Client, error) {
- if s.configProvider == nil {
- return nil, errors.New("RPC service is not configured")
- }
-
conn, err := s.config.Dialer()
if err != nil {
return nil, err