diff options
author | Valery Piashchynski <[email protected]> | 2020-12-26 00:40:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-26 00:40:31 +0300 |
commit | 7a0dee1a416705c621edbf50e1f43fb39845348f (patch) | |
tree | 0007a6b8c8ac9e7d31b8a5f3f7f27669c860d261 /plugins/rpc | |
parent | 8526c03822e724bc2ebb64b6197085fea335b782 (diff) |
Huge tests refactoring. Reduce running time 2-3x times
Diffstat (limited to 'plugins/rpc')
-rw-r--r-- | plugins/rpc/config.go | 47 | ||||
-rw-r--r-- | plugins/rpc/doc/plugin_arch.drawio | 1 | ||||
-rw-r--r-- | plugins/rpc/interface.go | 7 | ||||
-rw-r--r-- | plugins/rpc/plugin.go | 164 | ||||
-rw-r--r-- | plugins/rpc/util.go | 57 |
5 files changed, 276 insertions, 0 deletions
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go new file mode 100644 index 00000000..7f3474d7 --- /dev/null +++ b/plugins/rpc/config.go @@ -0,0 +1,47 @@ +package rpc + +import ( + "errors" + "net" + "strings" +) + +// Config defines RPC service config. +type Config struct { + // Listen string + Listen string + + // Disabled disables RPC service. + Disabled bool +} + +// InitDefaults allows to init blank config with pre-defined set of default values. +func (c *Config) InitDefaults() { + if c.Listen == "" { + c.Listen = "tcp://127.0.0.1:6001" + } +} + +// Valid returns nil if config is valid. +func (c *Config) Valid() error { + if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { + return errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") + } + + return nil +} + +// Listener creates new rpc socket Listener. +func (c *Config) Listener() (net.Listener, error) { + return CreateListener(c.Listen) +} + +// Dialer creates rpc socket Dialer. +func (c *Config) Dialer() (net.Conn, error) { + dsn := strings.Split(c.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") + } + + return net.Dial(dsn[0], dsn[1]) +} diff --git a/plugins/rpc/doc/plugin_arch.drawio b/plugins/rpc/doc/plugin_arch.drawio new file mode 100644 index 00000000..dec5f0b2 --- /dev/null +++ b/plugins/rpc/doc/plugin_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2020-10-19T17:14:19.125Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="2J39x4EyFr1zaE9BXKM4" version="13.7.9" type="device"><diagram id="q2oMKs6VHyn7y0AfAXBL" name="Page-1">7Vttc9o4EP41zLQfksE2GPIxQHPXu7RlQntt7ptiC1sX2XJlOUB//a1sGdtIJDQFnE6YyUys1YutfR7trlai44yj5R8cJeEH5mPasbv+suNMOrZtORcO/JOSVSEZWv1CEHDiq0aVYEZ+YCXsKmlGfJw2GgrGqCBJU+ixOMaeaMgQ52zRbDZntPnWBAVYE8w8RHXpV+KLUEkt96Kq+BOTIFSvHtqDoiJCZWM1kzREPlvURM67jjPmjIniKVqOMZXKK/VS9LvaUrv+MI5jsUuHL/zu0yx7//HT3Pln8vfN59vvS/usVHMqVuWMsQ8KUEXGRcgCFiP6rpKOOMtiH8thu1Cq2lwzloDQAuF/WIiVQhNlgoEoFBFVtXhJxLfa860c6ryvSpOlGjkvrMpCLPjqW71Q6yWLVbe8VPabs1hcoYhQKRizjBPMYcIf8UJVqq+8gGKhC6mArTpWohQG8lSrfz88xF8/ds/+uiLe7MsXtLiyZ2clVxEPsHik3WDNBFhCmEUYvh36cUyRIA/N70CKy8G6XQU3PCjEfwZ9q030K8RvazVPoV8BftvA+7dE33KOBP9jX/mAaKbedDOFkbpTmgUk1qjRBH4REoFnCcr1sADj3wT55xVv0PMD5gIvayJdU6rWGSi3otyMYw3OlWRRme21VwlrFtsdHEi9jqbe9zERha+ak0DTL0xVNJWIKAliePZAMaA+ZyQVQsA5XaqKiPh+sShxSn6gu3woiU7CSCzyCfVHnf5EjgXrMC103go+3Q18hho6QwM4pfPcOzg9DZwJTnDspyBk8Rqk8ylnDxCB8N8DLcveD1z2BlxWWa4vpu4x8epreOmuK/YvZcQnIaAoTYm34XeO5kMMun/aFRjdj45QDYG+AYBStrMHUW+YSgpWBOgNtxCgHKJwgapXPercGKhvbwxkbQxUKEYbKCfJetrP542r8aa0vt0U9gsE1rpzKfWVeK97ia+Xc41glolhB1viA32Jj+3O5YhIXc9loAHFEczdpRKWO95Ay/2eyZ1UrqqzQq8S14tkmeurrIanQP0vRvmVQYA052WwVAwHE7+rXrHBp/bCI3f4tPu1jMGReyCwLT06KoLPVPDMExnHmvrSBYkoinGpIVWz07oUcm8y8kJC/Wu0YpmcXiqQd1+WRiHj5AcMi0qIoJqXMNhuo8VM9lQLO1/oeFqiY22IPqBlo+E1SoUSeIxSlKTkbj2NCGwhiUdMCBbt0/k8P47uuQarULapE8Vye4diytDg+ke7R2hAKHaPx4wyIMYkZgWBCKUbopJDFM/FVgalsOEhcXCdt5n0KsmNUoUUMeg7p3kgEoI/wHG+axZIbPUHI9DyWIYl4BnsMZStqpw7iwT22WMWw1wQycHFwKMFTsUvU+Tx1fk0cUr34e7GE/tQBqV0SxpNpJGeYf6QK+VNjMX5TeK9PbGlTbb07ZbZYl1sYUsKTCEeltvAIlKr+aNuSqHqxJw2mTMwBC7HZY6eOSiYMydYni3IeHH8aILnxIk9c8Lq9tomxQ7pCUpyqAszUZ4lWc/iw3qXqQjwOc+8n1kaSRydJI6BEBTdYTqF3WixH57woq1h0/ryueDsGLAOD0UFPeNQ2AcYPmT+G7FK8NvCTMjHkzdply1HdCfmIzhDHvMIR3Av9jDVrKTOjjnUCzPaRzpN1Ra+Ciafk9Xo/nK6wmAsfpMMhrZ+DazZmsHoNTNdPcvgD1xDpmuwB4dgpIX9dLxY8aTKdZ78wp7osn2t/lQyw8SZg3kFPTmqcSZGkTIsgNeJLS2yxZTMOCpb9IizMigcByQFmyITGlYxV4A2o0iqyc+PvOGvYYPmTNbl2Xgzq17Wgdie/Ia1cYFkqO8pHftAx2FGVPUMVVJkul8VLK61cXJl67gc6pTSbAvcVgJ245259TW5Vm5M1k6i9xPlO7uG+b1Ww3zdOVdXCk5h/pHsgtM0C64p7WNywqWz3j8tdsgLX0tXHJ+itiNFbVsu176UIN/SL7xMOQOFR2lOl7a9fN3MP4rYHpbzxq7dsGk/1O1QMzT6nYOAqSAZFqaPvY78hYecQIBjzJGQgbNgsk2UeaH8Ji93RdLvefdY3ohDeZyNlx7G8iGjJMqvA5/pV61fE9YGy93fU6ANxer3NcWNwupXSs67/wE=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/rpc/interface.go b/plugins/rpc/interface.go new file mode 100644 index 00000000..683fd2ec --- /dev/null +++ b/plugins/rpc/interface.go @@ -0,0 +1,7 @@ +package rpc + +// RPCer declares the ability to create set of public RPC methods. +type RPCer interface { + // Provides RPC methods for the given service. + RPC() interface{} +} diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go new file mode 100644 index 00000000..6a83326b --- /dev/null +++ b/plugins/rpc/plugin.go @@ -0,0 +1,164 @@ +package rpc + +import ( + "net" + "net/rpc" + "sync/atomic" + + "github.com/spiral/endure" + "github.com/spiral/errors" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// PluginName contains default plugin name. +const PluginName = "RPC" + +type pluggable struct { + service RPCer + name string +} + +// Plugin is RPC service. +type Plugin struct { + cfg Config + log logger.Logger + rpc *rpc.Server + services []pluggable + listener net.Listener + closed *uint32 +} + +// Init rpc service. Must return true if service is enabled. +func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("RPC plugin init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &s.cfg) + if err != nil { + return errors.E(op, errors.Disabled, err) + } + s.cfg.InitDefaults() + + if s.cfg.Disabled { + return errors.E(op, errors.Disabled) + } + + s.log = log + state := uint32(0) + s.closed = &state + atomic.StoreUint32(s.closed, 0) + + return s.cfg.Valid() +} + +// Serve serves the service. +func (s *Plugin) Serve() chan error { + const op = errors.Op("register service") + errCh := make(chan error, 1) + + s.rpc = rpc.NewServer() + + services := make([]string, 0, len(s.services)) + + // Attach all services + for i := 0; i < len(s.services); i++ { + err := s.Register(s.services[i].name, s.services[i].service.RPC()) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + services = append(services, s.services[i].name) + } + + var err error + s.listener, err = s.cfg.Listener() + if err != nil { + errCh <- err + return errCh + } + + s.log.Debug("Started RPC service", "address", s.cfg.Listen, "services", services) + + go func() { + for { + conn, err := s.listener.Accept() + if err != nil { + if atomic.LoadUint32(s.closed) == 1 { + // just log and continue, this is not a critical issue, we just called Stop + s.log.Warn("listener accept error, connection closed", "error", err) + return + } + + s.log.Error("listener accept error", "error", err) + errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err) + return + } + + go s.rpc.ServeCodec(goridgeRpc.NewCodec(conn)) + } + }() + + return errCh +} + +// Stop stops the service. +func (s *Plugin) Stop() error { + // store closed state + atomic.StoreUint32(s.closed, 1) + err := s.listener.Close() + if err != nil { + return errors.E(errors.Op("stop RPC socket"), err) + } + return nil +} + +// Name contains service name. +func (s *Plugin) Name() string { + return PluginName +} + +// Depends declares services to collect for RPC. +func (s *Plugin) Collects() []interface{} { + return []interface{}{ + s.RegisterPlugin, + } +} + +// RegisterPlugin registers RPC service plugin. +func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) { + s.services = append(s.services, pluggable{ + service: p, + name: name.Name(), + }) +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Plugin) Register(name string, svc interface{}) error { + if s.rpc == nil { + return errors.E("RPC service is not configured") + } + + return s.rpc.RegisterName(name, svc) +} + +// Client creates new RPC client. +func (s *Plugin) Client() (*rpc.Client, error) { + conn, err := s.cfg.Dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil +} diff --git a/plugins/rpc/util.go b/plugins/rpc/util.go new file mode 100644 index 00000000..29a475a4 --- /dev/null +++ b/plugins/rpc/util.go @@ -0,0 +1,57 @@ +package rpc + +import ( + "errors" + "fmt" + "net" + "os" + "strings" + "syscall" + + "github.com/valyala/tcplisten" +) + +// CreateListener crates socket listener based on DSN definition. +func CreateListener(address string) (net.Listener, error) { + dsn := strings.Split(address, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + } + + if dsn[0] != "unix" && dsn[0] != "tcp" { + return nil, errors.New("invalid Protocol (tcp://:6001, unix://file.sock)") + } + + // create unix listener + if dsn[0] == "unix" { + // check if the file exist + if fileExists(dsn[1]) { + err := syscall.Unlink(dsn[1]) + if err != nil { + return nil, fmt.Errorf("error during the unlink syscall: error %v", err) + } + } + return net.Listen(dsn[0], dsn[1]) + } + + // configure and create tcp4 listener + cfg := tcplisten.Config{ + ReusePort: true, + DeferAccept: true, + FastOpen: true, + Backlog: 0, + } + + // only tcp4 is currently supported + return cfg.NewListener("tcp4", dsn[1]) +} + +// fileExists checks if a file exists and is not a directory before we +// try using it to prevent further errors. +func fileExists(filename string) bool { + info, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return !info.IsDir() +} |