summaryrefslogtreecommitdiff
path: root/plugins/rpc/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-19 18:10:50 +0300
committerValery Piashchynski <[email protected]>2020-10-19 18:10:50 +0300
commit87c57fd2191dbf5ad6a69a0b6e50a01ff8d9cadd (patch)
tree9103dad9fad35f62fecb82f66a9fa48fcc607842 /plugins/rpc/rpc.go
parent6f39542d75d0da1e0ff09906bdd340f855a409af (diff)
Initial implementation of the RPC RR 2.0 plugin
Diffstat (limited to 'plugins/rpc/rpc.go')
-rw-r--r--plugins/rpc/rpc.go175
1 files changed, 175 insertions, 0 deletions
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go
new file mode 100644
index 00000000..eb999b2f
--- /dev/null
+++ b/plugins/rpc/rpc.go
@@ -0,0 +1,175 @@
+package rpc
+
+import (
+ "errors"
+
+ "github.com/spiral/goridge/v2"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+
+ "net/rpc"
+)
+
+type Plugin interface {
+ Name() string
+ RpcService() interface{}
+}
+
+// ID contains default service name.
+const ID = "rpc"
+
+type services struct {
+ service interface{}
+ name string
+}
+
+// Service is RPC service.
+type Service struct {
+ // TODO do we need a pointer here since all receivers are pointers??
+ rpc *rpc.Server
+ configProvider config.Provider
+ services []services
+ config Config
+ close chan struct{}
+}
+
+// Init rpc service. Must return true if service is enabled.
+func (s *Service) Init(cfg config.Provider) error {
+ s.configProvider = cfg
+ s.close = make(chan struct{})
+
+ return nil
+}
+
+func (s *Service) Configure() error {
+ err := s.configProvider.UnmarshalKey(ID, &s.config)
+ if err != nil {
+ return err
+ }
+
+ // TODO Do we need to init defaults
+ if s.config.Listen == "" {
+ s.config.InitDefaults()
+ }
+
+ server := rpc.NewServer()
+ if server == nil {
+ return errors.New("rpc server is il")
+ }
+ s.rpc = server
+ return nil
+}
+
+// Serve serves the service.
+func (s *Service) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ //s.mu.Lock()
+ //s.serving = true
+ //s.stop = make(chan interface{})
+ //s.mu.Unlock()
+ if len(s.services) == 0 {
+ errCh <- errors.New("no services with RPC")
+ return errCh
+ }
+
+ // Attach all services
+ for i := 0; i < len(s.services); i++ {
+ err := s.Register(s.services[i].name, s.services[i].service)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ }
+
+ ln, err := s.config.Listener()
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ defer func() {
+ errCh <- ln.Close()
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-s.close:
+ return
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+ }()
+
+ //
+ //s.mu.Lock()
+ //s.serving = false
+ //s.mu.Unlock()
+
+ return nil
+}
+
+func (s *Service) Close() error {
+ s.close <- struct{}{}
+ return nil
+}
+
+// Stop stops the service.
+func (s *Service) Stop() {
+ //s.mu.Lock()
+ //defer s.mu.Unlock()
+ //
+ //if s.serving {
+ // close(s.stop)
+ //}
+
+}
+
+func (s *Service) Depends() []interface{} {
+ return []interface{}{
+ s.RpcService,
+ }
+}
+
+func (s *Service) RpcService(p Plugin) error {
+ s.services = append(s.services, services{
+ service: p.RpcService(),
+ name: p.Name(),
+ })
+ return nil
+}
+
+// Register publishes in the server the set of methods of the
+// receiver value that satisfy the following conditions:
+// - exported method of exported type
+// - two arguments, both of exported type
+// - the second argument is a pointer
+// - one return value, of type error
+// It returns an error if the receiver is not an exported type or has
+// no suitable methods. It also logs the error using package log.
+func (s *Service) Register(name string, svc interface{}) error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ return s.rpc.RegisterName(name, svc)
+}
+
+// Client creates new RPC client.
+func (s *Service) Client() (*rpc.Client, error) {
+ if s.configProvider == nil {
+ return nil, errors.New("RPC service is not configured")
+ }
+
+ conn, err := s.config.Dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}