summaryrefslogtreecommitdiff
path: root/plugins/rpc/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-21 19:42:23 +0300
committerValery Piashchynski <[email protected]>2020-12-21 19:42:23 +0300
commitee8b4075c0f836d698d1ae505c87c17147de447a (patch)
tree531d980e5bfb94ee39b03952a97e0445f7955409 /plugins/rpc/plugin.go
parent0ad45031047bb479e06ce0a0f496c6db9b2641c9 (diff)
Move plugins to the roadrunner-plugins repository
Diffstat (limited to 'plugins/rpc/plugin.go')
-rwxr-xr-xplugins/rpc/plugin.go165
1 files changed, 0 insertions, 165 deletions
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
deleted file mode 100755
index d0dc0ff1..00000000
--- a/plugins/rpc/plugin.go
+++ /dev/null
@@ -1,165 +0,0 @@
-package rpc
-
-import (
- "net"
- "net/rpc"
- "sync/atomic"
-
- "github.com/spiral/endure"
- "github.com/spiral/errors"
- goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/interfaces/config"
- "github.com/spiral/roadrunner/v2/interfaces/log"
- rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc"
-)
-
-// PluginName contains default plugin name.
-const PluginName = "RPC"
-
-type pluggable struct {
- service rpc_.RPCer
- name string
-}
-
-// Plugin is RPC service.
-type Plugin struct {
- cfg Config
- log log.Logger
- rpc *rpc.Server
- services []pluggable
- listener net.Listener
- closed *uint32
-}
-
-// Init rpc service. Must return true if service is enabled.
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error {
- const op = errors.Op("RPC Init")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
- if err != nil {
- return err
- }
- s.cfg.InitDefaults()
-
- if s.cfg.Disabled {
- return errors.E(op, errors.Disabled)
- }
-
- s.log = log
- state := uint32(0)
- s.closed = &state
- atomic.StoreUint32(s.closed, 0)
-
- return s.cfg.Valid()
-}
-
-// Serve serves the service.
-func (s *Plugin) Serve() chan error {
- const op = errors.Op("register service")
- errCh := make(chan error, 1)
-
- s.rpc = rpc.NewServer()
-
- services := make([]string, 0, len(s.services))
-
- // Attach all services
- for i := 0; i < len(s.services); i++ {
- err := s.Register(s.services[i].name, s.services[i].service.RPC())
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- services = append(services, s.services[i].name)
- }
-
- var err error
- s.listener, err = s.cfg.Listener()
- if err != nil {
- errCh <- err
- return errCh
- }
-
- s.log.Debug("Started RPC service", "address", s.cfg.Listen, "services", services)
-
- go func() {
- for {
- conn, err := s.listener.Accept()
- if err != nil {
- if atomic.LoadUint32(s.closed) == 1 {
- // just log and continue, this is not a critical issue, we just called Stop
- s.log.Error("listener accept error, connection closed", "error", err)
- return
- }
-
- s.log.Error("listener accept error", "error", err)
- errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err)
- return
- }
-
- go s.rpc.ServeCodec(goridgeRpc.NewCodec(conn))
- }
- }()
-
- return errCh
-}
-
-// Stop stops the service.
-func (s *Plugin) Stop() error {
- // store closed state
- atomic.StoreUint32(s.closed, 1)
- err := s.listener.Close()
- if err != nil {
- return errors.E(errors.Op("stop RPC socket"), err)
- }
- return nil
-}
-
-// Name contains service name.
-func (s *Plugin) Name() string {
- return PluginName
-}
-
-// Depends declares services to collect for RPC.
-func (s *Plugin) Collects() []interface{} {
- return []interface{}{
- s.RegisterPlugin,
- }
-}
-
-// RegisterPlugin registers RPC service plugin.
-func (s *Plugin) RegisterPlugin(name endure.Named, p rpc_.RPCer) {
- s.services = append(s.services, pluggable{
- service: p,
- name: name.Name(),
- })
-}
-
-// Register publishes in the server the set of methods of the
-// receiver value that satisfy the following conditions:
-// - exported method of exported type
-// - two arguments, both of exported type
-// - the second argument is a pointer
-// - one return value, of type error
-// It returns an error if the receiver is not an exported type or has
-// no suitable methods. It also logs the error using package log.
-func (s *Plugin) Register(name string, svc interface{}) error {
- if s.rpc == nil {
- return errors.E("RPC service is not configured")
- }
-
- return s.rpc.RegisterName(name, svc)
-}
-
-// Client creates new RPC client.
-func (s *Plugin) Client() (*rpc.Client, error) {
- conn, err := s.cfg.Dialer()
- if err != nil {
- return nil, err
- }
-
- return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil
-}