diff options
Diffstat (limited to 'plugins/rpc')
-rwxr-xr-x | plugins/rpc/config.go | 49 | ||||
-rwxr-xr-x | plugins/rpc/config_test.go | 143 | ||||
-rwxr-xr-x | plugins/rpc/doc/plugin_arch.drawio | 1 | ||||
-rwxr-xr-x | plugins/rpc/plugin.go | 165 | ||||
-rw-r--r-- | plugins/rpc/tests/.rr-rpc-disabled.yaml | 6 | ||||
-rw-r--r-- | plugins/rpc/tests/.rr.yaml | 6 | ||||
-rw-r--r-- | plugins/rpc/tests/plugin1.go | 42 | ||||
-rw-r--r-- | plugins/rpc/tests/plugin2.go | 54 | ||||
-rw-r--r-- | plugins/rpc/tests/rpc_test.go | 169 |
9 files changed, 0 insertions, 635 deletions
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go deleted file mode 100755 index 719fd5e3..00000000 --- a/plugins/rpc/config.go +++ /dev/null @@ -1,49 +0,0 @@ -package rpc - -import ( - "errors" - "net" - "strings" - - "github.com/spiral/roadrunner/v2/util" -) - -// Config defines RPC service config. -type Config struct { - // Listen string - Listen string - - // Disabled disables RPC service. - Disabled bool -} - -// InitDefaults allows to init blank config with pre-defined set of default values. -func (c *Config) InitDefaults() { - if c.Listen == "" { - c.Listen = "tcp://127.0.0.1:6001" - } -} - -// Valid returns nil if config is valid. -func (c *Config) Valid() error { - if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { - return errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") - } - - return nil -} - -// Listener creates new rpc socket Listener. -func (c *Config) Listener() (net.Listener, error) { - return util.CreateListener(c.Listen) -} - -// Dialer creates rpc socket Dialer. -func (c *Config) Dialer() (net.Conn, error) { - dsn := strings.Split(c.Listen, "://") - if len(dsn) != 2 { - return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") - } - - return net.Dial(dsn[0], dsn[1]) -} diff --git a/plugins/rpc/config_test.go b/plugins/rpc/config_test.go deleted file mode 100755 index 67532bc8..00000000 --- a/plugins/rpc/config_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package rpc - -import ( - "runtime" - "testing" - - j "github.com/json-iterator/go" - "github.com/stretchr/testify/assert" -) - -var json = j.ConfigCompatibleWithStandardLibrary - -type testCfg struct{ cfg string } - -func (cfg *testCfg) Unmarshal(out interface{}) error { - return json.Unmarshal([]byte(cfg.cfg), out) -} - -func TestConfig_Listener(t *testing.T) { - cfg := &Config{Listen: "tcp://:18001"} - - ln, err := cfg.Listener() - assert.NoError(t, err) - assert.NotNil(t, ln) - defer func() { - err := ln.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - assert.Equal(t, "tcp", ln.Addr().Network()) - if runtime.GOOS == "windows" { - assert.Equal(t, "[::]:18001", ln.Addr().String()) - } else { - assert.Equal(t, "0.0.0.0:18001", ln.Addr().String()) - } -} - -func TestConfig_ListenerUnix(t *testing.T) { - cfg := &Config{Listen: "unix://file.sock"} - - ln, err := cfg.Listener() - assert.NoError(t, err) - assert.NotNil(t, ln) - defer func() { - err := ln.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - assert.Equal(t, "unix", ln.Addr().Network()) - assert.Equal(t, "file.sock", ln.Addr().String()) -} - -func Test_Config_Error(t *testing.T) { - cfg := &Config{Listen: "uni:unix.sock"} - ln, err := cfg.Listener() - assert.Nil(t, ln) - assert.Error(t, err) - assert.Equal(t, "invalid DSN (tcp://:6001, unix://file.sock)", err.Error()) -} - -func Test_Config_ErrorMethod(t *testing.T) { - cfg := &Config{Listen: "xinu://unix.sock"} - - ln, err := cfg.Listener() - assert.Nil(t, ln) - assert.Error(t, err) -} - -func TestConfig_Dialer(t *testing.T) { - cfg := &Config{Listen: "tcp://:18001"} - - ln, _ := cfg.Listener() - defer func() { - err := ln.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - conn, err := cfg.Dialer() - assert.NoError(t, err) - assert.NotNil(t, conn) - defer func() { - err := conn.Close() - if err != nil { - t.Errorf("error closing the connection: error %v", err) - } - }() - - assert.Equal(t, "tcp", conn.RemoteAddr().Network()) - assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String()) -} - -func TestConfig_DialerUnix(t *testing.T) { - cfg := &Config{Listen: "unix://file.sock"} - - ln, _ := cfg.Listener() - defer func() { - err := ln.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - conn, err := cfg.Dialer() - assert.NoError(t, err) - assert.NotNil(t, conn) - defer func() { - err := conn.Close() - if err != nil { - t.Errorf("error closing the connection: error %v", err) - } - }() - - assert.Equal(t, "unix", conn.RemoteAddr().Network()) - assert.Equal(t, "file.sock", conn.RemoteAddr().String()) -} - -func Test_Config_DialerError(t *testing.T) { - cfg := &Config{Listen: "uni:unix.sock"} - ln, err := cfg.Dialer() - assert.Nil(t, ln) - assert.Error(t, err) - assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://file.sock)", err.Error()) -} - -func Test_Config_DialerErrorMethod(t *testing.T) { - cfg := &Config{Listen: "xinu://unix.sock"} - - ln, err := cfg.Dialer() - assert.Nil(t, ln) - assert.Error(t, err) -} - -func Test_Config_Defaults(t *testing.T) { - c := &Config{} - c.InitDefaults() - assert.Equal(t, "tcp://127.0.0.1:6001", c.Listen) -} diff --git a/plugins/rpc/doc/plugin_arch.drawio b/plugins/rpc/doc/plugin_arch.drawio deleted file mode 100755 index dec5f0b2..00000000 --- a/plugins/rpc/doc/plugin_arch.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2020-10-19T17:14:19.125Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="2J39x4EyFr1zaE9BXKM4" version="13.7.9" type="device"><diagram id="q2oMKs6VHyn7y0AfAXBL" name="Page-1">7Vttc9o4EP41zLQfksE2GPIxQHPXu7RlQntt7ptiC1sX2XJlOUB//a1sGdtIJDQFnE6YyUys1YutfR7trlai44yj5R8cJeEH5mPasbv+suNMOrZtORcO/JOSVSEZWv1CEHDiq0aVYEZ+YCXsKmlGfJw2GgrGqCBJU+ixOMaeaMgQ52zRbDZntPnWBAVYE8w8RHXpV+KLUEkt96Kq+BOTIFSvHtqDoiJCZWM1kzREPlvURM67jjPmjIniKVqOMZXKK/VS9LvaUrv+MI5jsUuHL/zu0yx7//HT3Pln8vfN59vvS/usVHMqVuWMsQ8KUEXGRcgCFiP6rpKOOMtiH8thu1Cq2lwzloDQAuF/WIiVQhNlgoEoFBFVtXhJxLfa860c6ryvSpOlGjkvrMpCLPjqW71Q6yWLVbe8VPabs1hcoYhQKRizjBPMYcIf8UJVqq+8gGKhC6mArTpWohQG8lSrfz88xF8/ds/+uiLe7MsXtLiyZ2clVxEPsHik3WDNBFhCmEUYvh36cUyRIA/N70CKy8G6XQU3PCjEfwZ9q030K8RvazVPoV8BftvA+7dE33KOBP9jX/mAaKbedDOFkbpTmgUk1qjRBH4REoFnCcr1sADj3wT55xVv0PMD5gIvayJdU6rWGSi3otyMYw3OlWRRme21VwlrFtsdHEi9jqbe9zERha+ak0DTL0xVNJWIKAliePZAMaA+ZyQVQsA5XaqKiPh+sShxSn6gu3woiU7CSCzyCfVHnf5EjgXrMC103go+3Q18hho6QwM4pfPcOzg9DZwJTnDspyBk8Rqk8ylnDxCB8N8DLcveD1z2BlxWWa4vpu4x8epreOmuK/YvZcQnIaAoTYm34XeO5kMMun/aFRjdj45QDYG+AYBStrMHUW+YSgpWBOgNtxCgHKJwgapXPercGKhvbwxkbQxUKEYbKCfJetrP542r8aa0vt0U9gsE1rpzKfWVeK97ia+Xc41glolhB1viA32Jj+3O5YhIXc9loAHFEczdpRKWO95Ay/2eyZ1UrqqzQq8S14tkmeurrIanQP0vRvmVQYA052WwVAwHE7+rXrHBp/bCI3f4tPu1jMGReyCwLT06KoLPVPDMExnHmvrSBYkoinGpIVWz07oUcm8y8kJC/Wu0YpmcXiqQd1+WRiHj5AcMi0qIoJqXMNhuo8VM9lQLO1/oeFqiY22IPqBlo+E1SoUSeIxSlKTkbj2NCGwhiUdMCBbt0/k8P47uuQarULapE8Vye4diytDg+ke7R2hAKHaPx4wyIMYkZgWBCKUbopJDFM/FVgalsOEhcXCdt5n0KsmNUoUUMeg7p3kgEoI/wHG+axZIbPUHI9DyWIYl4BnsMZStqpw7iwT22WMWw1wQycHFwKMFTsUvU+Tx1fk0cUr34e7GE/tQBqV0SxpNpJGeYf6QK+VNjMX5TeK9PbGlTbb07ZbZYl1sYUsKTCEeltvAIlKr+aNuSqHqxJw2mTMwBC7HZY6eOSiYMydYni3IeHH8aILnxIk9c8Lq9tomxQ7pCUpyqAszUZ4lWc/iw3qXqQjwOc+8n1kaSRydJI6BEBTdYTqF3WixH57woq1h0/ryueDsGLAOD0UFPeNQ2AcYPmT+G7FK8NvCTMjHkzdply1HdCfmIzhDHvMIR3Av9jDVrKTOjjnUCzPaRzpN1Ra+Ciafk9Xo/nK6wmAsfpMMhrZ+DazZmsHoNTNdPcvgD1xDpmuwB4dgpIX9dLxY8aTKdZ78wp7osn2t/lQyw8SZg3kFPTmqcSZGkTIsgNeJLS2yxZTMOCpb9IizMigcByQFmyITGlYxV4A2o0iqyc+PvOGvYYPmTNbl2Xgzq17Wgdie/Ia1cYFkqO8pHftAx2FGVPUMVVJkul8VLK61cXJl67gc6pTSbAvcVgJ245259TW5Vm5M1k6i9xPlO7uG+b1Ww3zdOVdXCk5h/pHsgtM0C64p7WNywqWz3j8tdsgLX0tXHJ+itiNFbVsu176UIN/SL7xMOQOFR2lOl7a9fN3MP4rYHpbzxq7dsGk/1O1QMzT6nYOAqSAZFqaPvY78hYecQIBjzJGQgbNgsk2UeaH8Ji93RdLvefdY3ohDeZyNlx7G8iGjJMqvA5/pV61fE9YGy93fU6ANxer3NcWNwupXSs67/wE=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go deleted file mode 100755 index d0dc0ff1..00000000 --- a/plugins/rpc/plugin.go +++ /dev/null @@ -1,165 +0,0 @@ -package rpc - -import ( - "net" - "net/rpc" - "sync/atomic" - - "github.com/spiral/endure" - "github.com/spiral/errors" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/interfaces/config" - "github.com/spiral/roadrunner/v2/interfaces/log" - rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc" -) - -// PluginName contains default plugin name. -const PluginName = "RPC" - -type pluggable struct { - service rpc_.RPCer - name string -} - -// Plugin is RPC service. -type Plugin struct { - cfg Config - log log.Logger - rpc *rpc.Server - services []pluggable - listener net.Listener - closed *uint32 -} - -// Init rpc service. Must return true if service is enabled. -func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error { - const op = errors.Op("RPC Init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &s.cfg) - if err != nil { - return err - } - s.cfg.InitDefaults() - - if s.cfg.Disabled { - return errors.E(op, errors.Disabled) - } - - s.log = log - state := uint32(0) - s.closed = &state - atomic.StoreUint32(s.closed, 0) - - return s.cfg.Valid() -} - -// Serve serves the service. -func (s *Plugin) Serve() chan error { - const op = errors.Op("register service") - errCh := make(chan error, 1) - - s.rpc = rpc.NewServer() - - services := make([]string, 0, len(s.services)) - - // Attach all services - for i := 0; i < len(s.services); i++ { - err := s.Register(s.services[i].name, s.services[i].service.RPC()) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - services = append(services, s.services[i].name) - } - - var err error - s.listener, err = s.cfg.Listener() - if err != nil { - errCh <- err - return errCh - } - - s.log.Debug("Started RPC service", "address", s.cfg.Listen, "services", services) - - go func() { - for { - conn, err := s.listener.Accept() - if err != nil { - if atomic.LoadUint32(s.closed) == 1 { - // just log and continue, this is not a critical issue, we just called Stop - s.log.Error("listener accept error, connection closed", "error", err) - return - } - - s.log.Error("listener accept error", "error", err) - errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err) - return - } - - go s.rpc.ServeCodec(goridgeRpc.NewCodec(conn)) - } - }() - - return errCh -} - -// Stop stops the service. -func (s *Plugin) Stop() error { - // store closed state - atomic.StoreUint32(s.closed, 1) - err := s.listener.Close() - if err != nil { - return errors.E(errors.Op("stop RPC socket"), err) - } - return nil -} - -// Name contains service name. -func (s *Plugin) Name() string { - return PluginName -} - -// Depends declares services to collect for RPC. -func (s *Plugin) Collects() []interface{} { - return []interface{}{ - s.RegisterPlugin, - } -} - -// RegisterPlugin registers RPC service plugin. -func (s *Plugin) RegisterPlugin(name endure.Named, p rpc_.RPCer) { - s.services = append(s.services, pluggable{ - service: p, - name: name.Name(), - }) -} - -// Register publishes in the server the set of methods of the -// receiver value that satisfy the following conditions: -// - exported method of exported type -// - two arguments, both of exported type -// - the second argument is a pointer -// - one return value, of type error -// It returns an error if the receiver is not an exported type or has -// no suitable methods. It also logs the error using package log. -func (s *Plugin) Register(name string, svc interface{}) error { - if s.rpc == nil { - return errors.E("RPC service is not configured") - } - - return s.rpc.RegisterName(name, svc) -} - -// Client creates new RPC client. -func (s *Plugin) Client() (*rpc.Client, error) { - conn, err := s.cfg.Dialer() - if err != nil { - return nil, err - } - - return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil -} diff --git a/plugins/rpc/tests/.rr-rpc-disabled.yaml b/plugins/rpc/tests/.rr-rpc-disabled.yaml deleted file mode 100644 index d5c185e7..00000000 --- a/plugins/rpc/tests/.rr-rpc-disabled.yaml +++ /dev/null @@ -1,6 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - disabled: true -logs: - mode: development - level: error
\ No newline at end of file diff --git a/plugins/rpc/tests/.rr.yaml b/plugins/rpc/tests/.rr.yaml deleted file mode 100644 index d2cb6c70..00000000 --- a/plugins/rpc/tests/.rr.yaml +++ /dev/null @@ -1,6 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - disabled: false -logs: - mode: development - level: error
\ No newline at end of file diff --git a/plugins/rpc/tests/plugin1.go b/plugins/rpc/tests/plugin1.go deleted file mode 100644 index 797f821a..00000000 --- a/plugins/rpc/tests/plugin1.go +++ /dev/null @@ -1,42 +0,0 @@ -package tests - -import ( - "fmt" - - "github.com/spiral/roadrunner/v2/interfaces/config" -) - -type Plugin1 struct { - config config.Configurer -} - -func (p1 *Plugin1) Init(cfg config.Configurer) error { - p1.config = cfg - return nil -} - -func (p1 *Plugin1) Serve() chan error { - errCh := make(chan error, 1) - return errCh -} - -func (p1 *Plugin1) Stop() error { - return nil -} - -func (p1 *Plugin1) Name() string { - return "rpc_test.plugin1" -} - -func (p1 *Plugin1) RPC() interface{} { - return &PluginRPC{srv: p1} -} - -type PluginRPC struct { - srv *Plugin1 -} - -func (r *PluginRPC) Hello(in string, out *string) error { - *out = fmt.Sprintf("Hello, username: %s", in) - return nil -} diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go deleted file mode 100644 index 411b9c54..00000000 --- a/plugins/rpc/tests/plugin2.go +++ /dev/null @@ -1,54 +0,0 @@ -package tests - -import ( - "net" - "net/rpc" - "time" - - "github.com/spiral/errors" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" -) - -// plugin2 makes a call to the plugin1 via RPC -// this is just a simulation of external call FOR TEST -// you don't need to do such things :) -type Plugin2 struct { -} - -func (p2 *Plugin2) Init() error { - return nil -} - -func (p2 *Plugin2) Serve() chan error { - errCh := make(chan error, 1) - - go func() { - time.Sleep(time.Second * 3) - - conn, err := net.Dial("tcp", "127.0.0.1:6001") - if err != nil { - errCh <- errors.E(errors.Serve, err) - return - } - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret string - err = client.Call("rpc_test.plugin1.Hello", "Valery", &ret) - if err != nil { - errCh <- err - return - } - if ret != "Hello, username: Valery" { - errCh <- errors.E("wrong response") - return - } - // to stop exec - errCh <- errors.E(errors.Disabled) - return - }() - - return errCh -} - -func (p2 *Plugin2) Stop() error { - return nil -} diff --git a/plugins/rpc/tests/rpc_test.go b/plugins/rpc/tests/rpc_test.go deleted file mode 100644 index 0344da6b..00000000 --- a/plugins/rpc/tests/rpc_test.go +++ /dev/null @@ -1,169 +0,0 @@ -package tests - -import ( - "os" - "os/signal" - "syscall" - "testing" - "time" - - "github.com/spiral/endure" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/stretchr/testify/assert" -) - -// graph https://bit.ly/3ensdNb -func TestRpcInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&Plugin1{}) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&Plugin2{}) - if err != nil { - t.Fatal(err) - } - - v := &config.Viper{} - v.Path = ".rr.yaml" - v.Prefix = "rr" - err = cont.Register(v) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&rpc.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&logger.ZapLogger{}) - if err != nil { - t.Fatal(err) - } - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - tt := time.NewTimer(time.Second * 10) - - for { - select { - case e := <-ch: - // just stop, this is ok - if errors.Is(errors.Disabled, e.Error) { - return - } - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-tt.C: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - assert.Fail(t, "timeout") - } - } -} - -// graph https://bit.ly/3ensdNb -func TestRpcDisabled(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&Plugin1{}) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&Plugin2{}) - if err != nil { - t.Fatal(err) - } - - v := &config.Viper{} - v.Path = ".rr-rpc-disabled.yaml" - v.Prefix = "rr" - err = cont.Register(v) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&rpc.Plugin{}) - if err != nil { - t.Fatal(err) - } - - err = cont.Register(&logger.ZapLogger{}) - if err != nil { - t.Fatal(err) - } - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - tt := time.NewTimer(time.Second * 20) - - for { - select { - case e := <-ch: - // RPC is turned off, should be and dial error - if errors.Is(errors.Disabled, e.Error) { - assert.FailNow(t, "should not be disabled error") - } - assert.Error(t, e.Error) - err = cont.Stop() - assert.Error(t, err) - return - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-tt.C: - // timeout - return - } - } -} |