summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-19 18:10:50 +0300
committerValery Piashchynski <[email protected]>2020-10-19 18:10:50 +0300
commit87c57fd2191dbf5ad6a69a0b6e50a01ff8d9cadd (patch)
tree9103dad9fad35f62fecb82f66a9fa48fcc607842
parent6f39542d75d0da1e0ff09906bdd340f855a409af (diff)
Initial implementation of the RPC RR 2.0 plugin
-rw-r--r--plugins/factory/app.go9
-rw-r--r--plugins/rpc/config.go46
-rw-r--r--plugins/rpc/config_test.go139
-rw-r--r--plugins/rpc/plugin_arch.drawio1
-rw-r--r--plugins/rpc/rpc.go175
-rw-r--r--plugins/rpc/rpc_test.go95
-rw-r--r--pool.go2
7 files changed, 457 insertions, 10 deletions
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
index 753ca2a9..48f286e7 100644
--- a/plugins/factory/app.go
+++ b/plugins/factory/app.go
@@ -111,15 +111,6 @@ func (app *App) NewFactory(env Env) (roadrunner.Factory, error) {
}
}
-func (app *App) Serve() chan error {
- errCh := make(chan error)
- return errCh
-}
-
-func (app *App) Stop() error {
- return nil
-}
-
func (app *App) setEnv(e Env) []string {
env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay))
for k, v := range e {
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go
new file mode 100644
index 00000000..1039ee5e
--- /dev/null
+++ b/plugins/rpc/config.go
@@ -0,0 +1,46 @@
+package rpc
+
+import (
+ "errors"
+ "net"
+ "strings"
+
+ "github.com/spiral/roadrunner/v2/util"
+)
+
+// Config defines RPC service config.
+type Config struct {
+ // Listen string
+ Listen string
+}
+
+// InitDefaults allows to init blank config with pre-defined set of default values.
+func (c *Config) InitDefaults() {
+ if c.Listen == "" {
+ c.Listen = "tcp://127.0.0.1:6001"
+ }
+}
+
+// Valid returns nil if config is valid.
+func (c *Config) Valid() error {
+ if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 {
+ return errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)")
+ }
+
+ return nil
+}
+
+// Listener creates new rpc socket Listener.
+func (c *Config) Listener() (net.Listener, error) {
+ return util.CreateListener(c.Listen)
+}
+
+// Dialer creates rpc socket Dialer.
+func (c *Config) Dialer() (net.Conn, error) {
+ dsn := strings.Split(c.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)")
+ }
+
+ return net.Dial(dsn[0], dsn[1])
+}
diff --git a/plugins/rpc/config_test.go b/plugins/rpc/config_test.go
new file mode 100644
index 00000000..dcc24ec1
--- /dev/null
+++ b/plugins/rpc/config_test.go
@@ -0,0 +1,139 @@
+package rpc
+
+import (
+ "testing"
+
+ json "github.com/json-iterator/go"
+ //"github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+)
+
+type testCfg struct{ cfg string }
+
+
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ j := json.ConfigCompatibleWithStandardLibrary
+ return j.Unmarshal([]byte(cfg.cfg), out)
+}
+
+func TestConfig_Listener(t *testing.T) {
+ cfg := &Config{Listen: "tcp://:18001"}
+
+ ln, err := cfg.Listener()
+ assert.NoError(t, err)
+ assert.NotNil(t, ln)
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+
+ assert.Equal(t, "tcp", ln.Addr().Network())
+ assert.Equal(t, "0.0.0.0:18001", ln.Addr().String())
+}
+
+func TestConfig_ListenerUnix(t *testing.T) {
+ cfg := &Config{Listen: "unix://file.sock"}
+
+ ln, err := cfg.Listener()
+ assert.NoError(t, err)
+ assert.NotNil(t, ln)
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+
+ assert.Equal(t, "unix", ln.Addr().Network())
+ assert.Equal(t, "file.sock", ln.Addr().String())
+}
+
+func Test_Config_Error(t *testing.T) {
+ cfg := &Config{Listen: "uni:unix.sock"}
+ ln, err := cfg.Listener()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid DSN (tcp://:6001, unix://file.sock)", err.Error())
+}
+
+func Test_Config_ErrorMethod(t *testing.T) {
+ cfg := &Config{Listen: "xinu://unix.sock"}
+
+ ln, err := cfg.Listener()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+}
+
+func TestConfig_Dialer(t *testing.T) {
+ cfg := &Config{Listen: "tcp://:18001"}
+
+ ln, _ := cfg.Listener()
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+
+ conn, err := cfg.Dialer()
+ assert.NoError(t, err)
+ assert.NotNil(t, conn)
+ defer func() {
+ err := conn.Close()
+ if err != nil {
+ t.Errorf("error closing the connection: error %v", err)
+ }
+ }()
+
+ assert.Equal(t, "tcp", conn.RemoteAddr().Network())
+ assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String())
+}
+
+func TestConfig_DialerUnix(t *testing.T) {
+ cfg := &Config{Listen: "unix://file.sock"}
+
+ ln, _ := cfg.Listener()
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+
+ conn, err := cfg.Dialer()
+ assert.NoError(t, err)
+ assert.NotNil(t, conn)
+ defer func() {
+ err := conn.Close()
+ if err != nil {
+ t.Errorf("error closing the connection: error %v", err)
+ }
+ }()
+
+ assert.Equal(t, "unix", conn.RemoteAddr().Network())
+ assert.Equal(t, "file.sock", conn.RemoteAddr().String())
+}
+
+func Test_Config_DialerError(t *testing.T) {
+ cfg := &Config{Listen: "uni:unix.sock"}
+ ln, err := cfg.Dialer()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://file.sock)", err.Error())
+}
+
+func Test_Config_DialerErrorMethod(t *testing.T) {
+ cfg := &Config{Listen: "xinu://unix.sock"}
+
+ ln, err := cfg.Dialer()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+}
+
+func Test_Config_Defaults(t *testing.T) {
+ c := &Config{}
+ c.InitDefaults()
+ assert.Equal(t, "tcp://127.0.0.1:6001", c.Listen)
+}
diff --git a/plugins/rpc/plugin_arch.drawio b/plugins/rpc/plugin_arch.drawio
new file mode 100644
index 00000000..ee867b37
--- /dev/null
+++ b/plugins/rpc/plugin_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2020-10-19T14:58:31.470Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="ikeu7x4QzrcZ5Y9GcAHD" version="13.7.9" type="device"><diagram id="q2oMKs6VHyn7y0AfAXBL" name="Page-1">7Vttc9o4EP41zLQfksE2GPIxQHPXu7RlQntt7ptiC1sX2XJlOUB//a1sGdtIJDQFnE6YyUys1YutfR7trlai44yj5R8cJeEH5mPasbv+suNMOrZtORcO/JOSVSEZWv1CEHDiq0aVYEZ+YCXsKmlGfJw2GgrGqCBJU+ixOMaeaMgQ52zRbDZntPnWBAVYE8w8RHXpV+KLUEkt96Kq+BOTIFSvHtqDoiJCZWM1kzREPlvURM67jjPmjIniKVqOMZXKK/VS9LvaUrv+MI5jsUuHL/zu0yx7//HT3Pln8vfN59vvS/usVHMqVuWMsQ8KUEXGRcgCFiP6rpKOOMtiH8thu1Cq2lwzloDQAuF/WIiVQhNlgoEoFBFVtXhJxLfa860c6ryvSpOlGjkvrMpCLPjqW71Q6yWLVbe8VPabs1hcoYhQKRizjBPMYcIf8UJVqq+8gGKhC6mArTpWohQG8lSrfz88xF8/ds/+uiLe7MsXtLiyZ2clVxEPsHik3WDNBFhCmEUYvh36cUyRIA/N70CKy8G6XQU3PCjEfwZ9q030K8RvazVPoV8BftvA+7dE33KOBP9jX/mAaKbedDOFkbpTmgUk1qjRBH4REoFnCcr1sADj3wT55xVv0PMD5gIvayJdU6rWGSi3otyMYw3OlWRRme21VwlrFtsdHEi9jqbe9zERha+ak0DTL0xVNJWIKAliePZAMaA+ZyQVQsA5XaqKiPh+sShxSn6gu3woiU7CSCzyCfVHnf5EjgXrMC103go+3Q18hho6QwM4pfPcOzg9DZwJTnDspyBk8Rqk8ylnDxCB8N8DLcveD1z2BlxWWa4vpu4x8epreOmuK/YvZcQnIaAoTYm34XeO5kMMun/aFRjdj45QDYG+AYBStrMHUW+YSgpWBOgNtxCgHKJwgapXPercGKhvbwxkbQxUKEYbKCfJetrP542r8aa0vt0U9gsE1rpzKfWVeK97ia+Xc41glolhB1viA32Jj+3O5YhIXc9loAHFEczdpRKWO95Ay/2eyZ1UrqqzQq8S14tkmeurrIanQP0vRvmVQYA052WwVAwHE7+rXrHBp/bCI3f4tPu1jMGReyCwLT06KoLPVPDMExnHmvrSBYkoinGpIVWz07oUcm8y8kJC/Wu0YpmcXiqQd1+WRiHj5AcMi0qIoJqXMNhuo8VM9lQLO1/oeFqiY22IPqBlo+E1SoUSeIxSlKTkbj2NCGwhiUdMCBbt0/k8P47uuQarULapE8Vye4diytDg+ke7R2hAKHaPx4wyIMYkZgWBCKUbopJDFM/FVgalsOEhcXCdt5n0KsmNUoUUMeg7p3kgEoI/wHG+axZIbPUHI9DyWIYl4BnsMZStqpw7iwT22WMWw1wQycHFwKMFTsUvU+Tx1fk0cUr34e7GE/tQBqV0SxpNpJGeYf6QK+VNjMX5TeK9PbGlTbb07ZbZYl1sYUsKTCEeltvAIlKr+aNuSqHqxJw2mTMwBC7HZY6eOSiYMydYni3IeHH8aILnxIk9c8Lq9tomxQ7pCUpyqAszUZ4lWc/iw3qXqQjwOc+8n1kaSRydJI6BEBTdYTqF3WixH57woq1h0/ryueDsGLAOD0UFPeNQ2AcYPmT+G7FK8NvCTMjHkzdply1HdCfmIzhDHvMIR3Av9jDVrKTOjjnUCzPaRzpN1Ra+Ciafk9Xo/nK6wmAsfpMMhrZ+DazZmsHoNTNdPcvgD1xDpmuwB4dgpIX9dLxY8aTKdZ78wp7osn2t/lQyw8SZg3kFPTmqcSZGkTIsgNeJLS2yxZTMOCpb9IizMigcByQFmyITGlYxV4A2o0iqyc+PvOGvYYPmTNbl2Xgzq17Wgdie/Ia1cYFkqO8pHftAx2FGVPUMVVJkul8VLK61cXJl67gc6pTSbAvcVgJ245259TW5Vm5M1k6i9xPlO7uG+b1Ww3zdOVdXCk5h/pHsgtM0C64p7WNywqWz3j8tdsgLX0tXHJ+itiNFbVsu176UIN/SL7xMOQOFR2lOl7a9fN3MP4rYHpbzxq7dsGk/1O1QMzT6nYOAqSAZFqaPvY78hYecQIBjzJGQgbNgsk2UeaH8Ji93RdLvefdY3ohDeZyNlx7G8iGjJMqvA5/pV61fE9YGy93fU6ANxer3NcWNwupXSs67/wE=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go
new file mode 100644
index 00000000..eb999b2f
--- /dev/null
+++ b/plugins/rpc/rpc.go
@@ -0,0 +1,175 @@
+package rpc
+
+import (
+ "errors"
+
+ "github.com/spiral/goridge/v2"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+
+ "net/rpc"
+)
+
+type Plugin interface {
+ Name() string
+ RpcService() interface{}
+}
+
+// ID contains default service name.
+const ID = "rpc"
+
+type services struct {
+ service interface{}
+ name string
+}
+
+// Service is RPC service.
+type Service struct {
+ // TODO do we need a pointer here since all receivers are pointers??
+ rpc *rpc.Server
+ configProvider config.Provider
+ services []services
+ config Config
+ close chan struct{}
+}
+
+// Init rpc service. Must return true if service is enabled.
+func (s *Service) Init(cfg config.Provider) error {
+ s.configProvider = cfg
+ s.close = make(chan struct{})
+
+ return nil
+}
+
+func (s *Service) Configure() error {
+ err := s.configProvider.UnmarshalKey(ID, &s.config)
+ if err != nil {
+ return err
+ }
+
+ // TODO Do we need to init defaults
+ if s.config.Listen == "" {
+ s.config.InitDefaults()
+ }
+
+ server := rpc.NewServer()
+ if server == nil {
+ return errors.New("rpc server is il")
+ }
+ s.rpc = server
+ return nil
+}
+
+// Serve serves the service.
+func (s *Service) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ //s.mu.Lock()
+ //s.serving = true
+ //s.stop = make(chan interface{})
+ //s.mu.Unlock()
+ if len(s.services) == 0 {
+ errCh <- errors.New("no services with RPC")
+ return errCh
+ }
+
+ // Attach all services
+ for i := 0; i < len(s.services); i++ {
+ err := s.Register(s.services[i].name, s.services[i].service)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ }
+
+ ln, err := s.config.Listener()
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ defer func() {
+ errCh <- ln.Close()
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-s.close:
+ return
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+ }()
+
+ //
+ //s.mu.Lock()
+ //s.serving = false
+ //s.mu.Unlock()
+
+ return nil
+}
+
+func (s *Service) Close() error {
+ s.close <- struct{}{}
+ return nil
+}
+
+// Stop stops the service.
+func (s *Service) Stop() {
+ //s.mu.Lock()
+ //defer s.mu.Unlock()
+ //
+ //if s.serving {
+ // close(s.stop)
+ //}
+
+}
+
+func (s *Service) Depends() []interface{} {
+ return []interface{}{
+ s.RpcService,
+ }
+}
+
+func (s *Service) RpcService(p Plugin) error {
+ s.services = append(s.services, services{
+ service: p.RpcService(),
+ name: p.Name(),
+ })
+ return nil
+}
+
+// Register publishes in the server the set of methods of the
+// receiver value that satisfy the following conditions:
+// - exported method of exported type
+// - two arguments, both of exported type
+// - the second argument is a pointer
+// - one return value, of type error
+// It returns an error if the receiver is not an exported type or has
+// no suitable methods. It also logs the error using package log.
+func (s *Service) Register(name string, svc interface{}) error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ return s.rpc.RegisterName(name, svc)
+}
+
+// Client creates new RPC client.
+func (s *Service) Client() (*rpc.Client, error) {
+ if s.configProvider == nil {
+ return nil, errors.New("RPC service is not configured")
+ }
+
+ conn, err := s.config.Dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}
diff --git a/plugins/rpc/rpc_test.go b/plugins/rpc/rpc_test.go
new file mode 100644
index 00000000..29d10b0e
--- /dev/null
+++ b/plugins/rpc/rpc_test.go
@@ -0,0 +1,95 @@
+package rpc
+
+//import (
+// "testing"
+// "time"
+//
+// "github.com/stretchr/testify/assert"
+//)
+//
+//type testService struct{}
+//
+//func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil }
+//
+//func Test_Disabled(t *testing.T) {
+// s := &Service{}
+// ok, err := s.Init(&Config{Enable: false}, service.NewContainer(nil), nil)
+//
+// assert.NoError(t, err)
+// assert.False(t, ok)
+//}
+//
+//func Test_RegisterNotConfigured(t *testing.T) {
+// s := &Service{}
+// assert.Error(t, s.Register("test", &testService{}))
+//
+// client, err := s.Client()
+// assert.Nil(t, client)
+// assert.Error(t, err)
+// assert.Error(t, s.Serve())
+//}
+//
+//func Test_Enabled(t *testing.T) {
+// s := &Service{}
+// ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil)
+//
+// assert.NoError(t, err)
+// assert.True(t, ok)
+//}
+//
+//func Test_StopNonServing(t *testing.T) {
+// s := &Service{}
+// ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil)
+//
+// assert.NoError(t, err)
+// assert.True(t, ok)
+// s.Stop()
+//}
+//
+//func Test_Serve_Errors(t *testing.T) {
+// s := &Service{}
+// ok, err := s.Init(&Config{Enable: true, Listen: "malformed"}, service.NewContainer(nil), nil)
+// assert.NoError(t, err)
+// assert.True(t, ok)
+//
+// assert.Error(t, s.Serve())
+//
+// client, err := s.Client()
+// assert.Nil(t, client)
+// assert.Error(t, err)
+//}
+//
+//func Test_Serve_Client(t *testing.T) {
+// s := &Service{}
+// ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), nil)
+// assert.NoError(t, err)
+// assert.True(t, ok)
+//
+// defer s.Stop()
+//
+// assert.NoError(t, s.Register("test", &testService{}))
+//
+// go func() { assert.NoError(t, s.Serve()) }()
+// time.Sleep(time.Second)
+//
+// client, err := s.Client()
+// assert.NotNil(t, client)
+// assert.NoError(t, err)
+//
+// var resp string
+// assert.NoError(t, client.Call("test.Echo", "hello world", &resp))
+// assert.Equal(t, "hello world", resp)
+// assert.NoError(t, client.Close())
+//}
+//
+//func TestSetEnv(t *testing.T) {
+// s := &Service{}
+// e := env.NewService(map[string]string{})
+// ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), e)
+//
+// assert.NoError(t, err)
+// assert.True(t, ok)
+//
+// v, _ := e.GetEnv()
+// assert.Equal(t, "tcp://localhost:9018", v["RR_RPC"])
+//}
diff --git a/pool.go b/pool.go
index d9886360..343dedf6 100644
--- a/pool.go
+++ b/pool.go
@@ -48,7 +48,7 @@ type Pool interface {
// Exec one task with given payload and context, returns result or error.
ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
- // Exec
+ // Exec
Exec(rqs Payload) (Payload, error)
// Workers returns worker list associated with the pool.