diff options
author | Valery Piashchynski <[email protected]> | 2020-10-19 18:10:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-19 18:10:50 +0300 |
commit | 87c57fd2191dbf5ad6a69a0b6e50a01ff8d9cadd (patch) | |
tree | 9103dad9fad35f62fecb82f66a9fa48fcc607842 /plugins/rpc/rpc.go | |
parent | 6f39542d75d0da1e0ff09906bdd340f855a409af (diff) |
Initial implementation of the RPC RR 2.0 plugin
Diffstat (limited to 'plugins/rpc/rpc.go')
-rw-r--r-- | plugins/rpc/rpc.go | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go new file mode 100644 index 00000000..eb999b2f --- /dev/null +++ b/plugins/rpc/rpc.go @@ -0,0 +1,175 @@ +package rpc + +import ( + "errors" + + "github.com/spiral/goridge/v2" + "github.com/spiral/roadrunner/v2/plugins/config" + + "net/rpc" +) + +type Plugin interface { + Name() string + RpcService() interface{} +} + +// ID contains default service name. +const ID = "rpc" + +type services struct { + service interface{} + name string +} + +// Service is RPC service. +type Service struct { + // TODO do we need a pointer here since all receivers are pointers?? + rpc *rpc.Server + configProvider config.Provider + services []services + config Config + close chan struct{} +} + +// Init rpc service. Must return true if service is enabled. +func (s *Service) Init(cfg config.Provider) error { + s.configProvider = cfg + s.close = make(chan struct{}) + + return nil +} + +func (s *Service) Configure() error { + err := s.configProvider.UnmarshalKey(ID, &s.config) + if err != nil { + return err + } + + // TODO Do we need to init defaults + if s.config.Listen == "" { + s.config.InitDefaults() + } + + server := rpc.NewServer() + if server == nil { + return errors.New("rpc server is il") + } + s.rpc = server + return nil +} + +// Serve serves the service. +func (s *Service) Serve() chan error { + errCh := make(chan error, 1) + + //s.mu.Lock() + //s.serving = true + //s.stop = make(chan interface{}) + //s.mu.Unlock() + if len(s.services) == 0 { + errCh <- errors.New("no services with RPC") + return errCh + } + + // Attach all services + for i := 0; i < len(s.services); i++ { + err := s.Register(s.services[i].name, s.services[i].service) + if err != nil { + errCh <- err + return errCh + } + } + + ln, err := s.config.Listener() + if err != nil { + errCh <- err + return errCh + } + defer func() { + errCh <- ln.Close() + }() + + go func() { + for { + select { + case <-s.close: + return + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + }() + + // + //s.mu.Lock() + //s.serving = false + //s.mu.Unlock() + + return nil +} + +func (s *Service) Close() error { + s.close <- struct{}{} + return nil +} + +// Stop stops the service. +func (s *Service) Stop() { + //s.mu.Lock() + //defer s.mu.Unlock() + // + //if s.serving { + // close(s.stop) + //} + +} + +func (s *Service) Depends() []interface{} { + return []interface{}{ + s.RpcService, + } +} + +func (s *Service) RpcService(p Plugin) error { + s.services = append(s.services, services{ + service: p.RpcService(), + name: p.Name(), + }) + return nil +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Service) Register(name string, svc interface{}) error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + return s.rpc.RegisterName(name, svc) +} + +// Client creates new RPC client. +func (s *Service) Client() (*rpc.Client, error) { + if s.configProvider == nil { + return nil, errors.New("RPC service is not configured") + } + + conn, err := s.config.Dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} |