summaryrefslogtreecommitdiff
path: root/plugins/rpc/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-26 00:40:31 +0300
committerValery Piashchynski <[email protected]>2020-12-26 00:40:31 +0300
commit7a0dee1a416705c621edbf50e1f43fb39845348f (patch)
tree0007a6b8c8ac9e7d31b8a5f3f7f27669c860d261 /plugins/rpc/plugin.go
parent8526c03822e724bc2ebb64b6197085fea335b782 (diff)
Huge tests refactoring. Reduce running time 2-3x times
Diffstat (limited to 'plugins/rpc/plugin.go')
-rw-r--r--plugins/rpc/plugin.go164
1 files changed, 164 insertions, 0 deletions
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
new file mode 100644
index 00000000..6a83326b
--- /dev/null
+++ b/plugins/rpc/plugin.go
@@ -0,0 +1,164 @@
+package rpc
+
+import (
+ "net"
+ "net/rpc"
+ "sync/atomic"
+
+ "github.com/spiral/endure"
+ "github.com/spiral/errors"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// PluginName contains default plugin name.
+const PluginName = "RPC"
+
+type pluggable struct {
+ service RPCer
+ name string
+}
+
+// Plugin is RPC service.
+type Plugin struct {
+ cfg Config
+ log logger.Logger
+ rpc *rpc.Server
+ services []pluggable
+ listener net.Listener
+ closed *uint32
+}
+
+// Init rpc service. Must return true if service is enabled.
+func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("RPC plugin init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+ s.cfg.InitDefaults()
+
+ if s.cfg.Disabled {
+ return errors.E(op, errors.Disabled)
+ }
+
+ s.log = log
+ state := uint32(0)
+ s.closed = &state
+ atomic.StoreUint32(s.closed, 0)
+
+ return s.cfg.Valid()
+}
+
+// Serve serves the service.
+func (s *Plugin) Serve() chan error {
+ const op = errors.Op("register service")
+ errCh := make(chan error, 1)
+
+ s.rpc = rpc.NewServer()
+
+ services := make([]string, 0, len(s.services))
+
+ // Attach all services
+ for i := 0; i < len(s.services); i++ {
+ err := s.Register(s.services[i].name, s.services[i].service.RPC())
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ services = append(services, s.services[i].name)
+ }
+
+ var err error
+ s.listener, err = s.cfg.Listener()
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ s.log.Debug("Started RPC service", "address", s.cfg.Listen, "services", services)
+
+ go func() {
+ for {
+ conn, err := s.listener.Accept()
+ if err != nil {
+ if atomic.LoadUint32(s.closed) == 1 {
+ // just log and continue, this is not a critical issue, we just called Stop
+ s.log.Warn("listener accept error, connection closed", "error", err)
+ return
+ }
+
+ s.log.Error("listener accept error", "error", err)
+ errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err)
+ return
+ }
+
+ go s.rpc.ServeCodec(goridgeRpc.NewCodec(conn))
+ }
+ }()
+
+ return errCh
+}
+
+// Stop stops the service.
+func (s *Plugin) Stop() error {
+ // store closed state
+ atomic.StoreUint32(s.closed, 1)
+ err := s.listener.Close()
+ if err != nil {
+ return errors.E(errors.Op("stop RPC socket"), err)
+ }
+ return nil
+}
+
+// Name contains service name.
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
+// Depends declares services to collect for RPC.
+func (s *Plugin) Collects() []interface{} {
+ return []interface{}{
+ s.RegisterPlugin,
+ }
+}
+
+// RegisterPlugin registers RPC service plugin.
+func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) {
+ s.services = append(s.services, pluggable{
+ service: p,
+ name: name.Name(),
+ })
+}
+
+// Register publishes in the server the set of methods of the
+// receiver value that satisfy the following conditions:
+// - exported method of exported type
+// - two arguments, both of exported type
+// - the second argument is a pointer
+// - one return value, of type error
+// It returns an error if the receiver is not an exported type or has
+// no suitable methods. It also logs the error using package log.
+func (s *Plugin) Register(name string, svc interface{}) error {
+ if s.rpc == nil {
+ return errors.E("RPC service is not configured")
+ }
+
+ return s.rpc.RegisterName(name, svc)
+}
+
+// Client creates new RPC client.
+func (s *Plugin) Client() (*rpc.Client, error) {
+ conn, err := s.cfg.Dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil
+}