summaryrefslogtreecommitdiff
path: root/plugins/rpc/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/rpc/plugin.go')
-rw-r--r--plugins/rpc/plugin.go155
1 files changed, 0 insertions, 155 deletions
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
deleted file mode 100644
index b8ee6d13..00000000
--- a/plugins/rpc/plugin.go
+++ /dev/null
@@ -1,155 +0,0 @@
-package rpc
-
-import (
- "net"
- "net/rpc"
- "sync/atomic"
-
- endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/errors"
- goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-// PluginName contains default plugin name.
-const PluginName = "rpc"
-
-// Plugin is RPC service.
-type Plugin struct {
- cfg Config
- log logger.Logger
- rpc *rpc.Server
- // set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC
- plugins map[string]RPCer
- listener net.Listener
- closed uint32
-}
-
-// Init rpc service. Must return true if service is enabled.
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("rpc_plugin_init")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
- if err != nil {
- return errors.E(op, errors.Disabled, err)
- }
- // Init defaults
- s.cfg.InitDefaults()
- // Init pluggable plugins map
- s.plugins = make(map[string]RPCer, 5)
- // init logs
- s.log = log
-
- // set up state
- atomic.StoreUint32(&s.closed, 0)
-
- // validate config
- err = s.cfg.Valid()
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-// Serve serves the service.
-func (s *Plugin) Serve() chan error {
- const op = errors.Op("rpc_plugin_serve")
- errCh := make(chan error, 1)
-
- s.rpc = rpc.NewServer()
-
- plugins := make([]string, 0, len(s.plugins))
-
- // Attach all services
- for name := range s.plugins {
- err := s.Register(name, s.plugins[name].RPC())
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- plugins = append(plugins, name)
- }
-
- var err error
- s.listener, err = s.cfg.Listener()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- s.log.Debug("Started RPC service", "address", s.cfg.Listen, "plugins", plugins)
-
- go func() {
- for {
- conn, err := s.listener.Accept()
- if err != nil {
- if atomic.LoadUint32(&s.closed) == 1 {
- // just continue, this is not a critical issue, we just called Stop
- return
- }
-
- s.log.Error("listener accept error", "error", err)
- errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err)
- return
- }
-
- go s.rpc.ServeCodec(goridgeRpc.NewCodec(conn))
- }
- }()
-
- return errCh
-}
-
-// Stop stops the service.
-func (s *Plugin) Stop() error {
- const op = errors.Op("rpc_plugin_stop")
- // store closed state
- atomic.StoreUint32(&s.closed, 1)
- err := s.listener.Close()
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-// Name contains service name.
-func (s *Plugin) Name() string {
- return PluginName
-}
-
-// Available interface implementation
-func (s *Plugin) Available() {
-}
-
-// Collects all plugins which implement Name + RPCer interfaces
-func (s *Plugin) Collects() []interface{} {
- return []interface{}{
- s.RegisterPlugin,
- }
-}
-
-// RegisterPlugin registers RPC service plugin.
-func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) {
- s.plugins[name.Name()] = p
-}
-
-// Register publishes in the server the set of methods of the
-// receiver value that satisfy the following conditions:
-// - exported method of exported type
-// - two arguments, both of exported type
-// - the second argument is a pointer
-// - one return value, of type error
-// It returns an error if the receiver is not an exported type or has
-// no suitable methods. It also logs the error using package log.
-func (s *Plugin) Register(name string, svc interface{}) error {
- if s.rpc == nil {
- return errors.E("RPC service is not configured")
- }
-
- return s.rpc.RegisterName(name, svc)
-}