diff options
author | Valery Piashchynski <[email protected]> | 2020-10-19 18:10:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-19 18:10:50 +0300 |
commit | 87c57fd2191dbf5ad6a69a0b6e50a01ff8d9cadd (patch) | |
tree | 9103dad9fad35f62fecb82f66a9fa48fcc607842 | |
parent | 6f39542d75d0da1e0ff09906bdd340f855a409af (diff) |
Initial implementation of the RPC RR 2.0 plugin
-rw-r--r-- | plugins/factory/app.go | 9 | ||||
-rw-r--r-- | plugins/rpc/config.go | 46 | ||||
-rw-r--r-- | plugins/rpc/config_test.go | 139 | ||||
-rw-r--r-- | plugins/rpc/plugin_arch.drawio | 1 | ||||
-rw-r--r-- | plugins/rpc/rpc.go | 175 | ||||
-rw-r--r-- | plugins/rpc/rpc_test.go | 95 | ||||
-rw-r--r-- | pool.go | 2 |
7 files changed, 457 insertions, 10 deletions
diff --git a/plugins/factory/app.go b/plugins/factory/app.go index 753ca2a9..48f286e7 100644 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -111,15 +111,6 @@ func (app *App) NewFactory(env Env) (roadrunner.Factory, error) { } } -func (app *App) Serve() chan error { - errCh := make(chan error) - return errCh -} - -func (app *App) Stop() error { - return nil -} - func (app *App) setEnv(e Env) []string { env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) for k, v := range e { diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go new file mode 100644 index 00000000..1039ee5e --- /dev/null +++ b/plugins/rpc/config.go @@ -0,0 +1,46 @@ +package rpc + +import ( + "errors" + "net" + "strings" + + "github.com/spiral/roadrunner/v2/util" +) + +// Config defines RPC service config. +type Config struct { + // Listen string + Listen string +} + +// InitDefaults allows to init blank config with pre-defined set of default values. +func (c *Config) InitDefaults() { + if c.Listen == "" { + c.Listen = "tcp://127.0.0.1:6001" + } +} + +// Valid returns nil if config is valid. +func (c *Config) Valid() error { + if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { + return errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") + } + + return nil +} + +// Listener creates new rpc socket Listener. +func (c *Config) Listener() (net.Listener, error) { + return util.CreateListener(c.Listen) +} + +// Dialer creates rpc socket Dialer. +func (c *Config) Dialer() (net.Conn, error) { + dsn := strings.Split(c.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") + } + + return net.Dial(dsn[0], dsn[1]) +} diff --git a/plugins/rpc/config_test.go b/plugins/rpc/config_test.go new file mode 100644 index 00000000..dcc24ec1 --- /dev/null +++ b/plugins/rpc/config_test.go @@ -0,0 +1,139 @@ +package rpc + +import ( + "testing" + + json "github.com/json-iterator/go" + //"github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" +) + +type testCfg struct{ cfg string } + + +func (cfg *testCfg) Unmarshal(out interface{}) error { + j := json.ConfigCompatibleWithStandardLibrary + return j.Unmarshal([]byte(cfg.cfg), out) +} + +func TestConfig_Listener(t *testing.T) { + cfg := &Config{Listen: "tcp://:18001"} + + ln, err := cfg.Listener() + assert.NoError(t, err) + assert.NotNil(t, ln) + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + assert.Equal(t, "tcp", ln.Addr().Network()) + assert.Equal(t, "0.0.0.0:18001", ln.Addr().String()) +} + +func TestConfig_ListenerUnix(t *testing.T) { + cfg := &Config{Listen: "unix://file.sock"} + + ln, err := cfg.Listener() + assert.NoError(t, err) + assert.NotNil(t, ln) + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + assert.Equal(t, "unix", ln.Addr().Network()) + assert.Equal(t, "file.sock", ln.Addr().String()) +} + +func Test_Config_Error(t *testing.T) { + cfg := &Config{Listen: "uni:unix.sock"} + ln, err := cfg.Listener() + assert.Nil(t, ln) + assert.Error(t, err) + assert.Equal(t, "invalid DSN (tcp://:6001, unix://file.sock)", err.Error()) +} + +func Test_Config_ErrorMethod(t *testing.T) { + cfg := &Config{Listen: "xinu://unix.sock"} + + ln, err := cfg.Listener() + assert.Nil(t, ln) + assert.Error(t, err) +} + +func TestConfig_Dialer(t *testing.T) { + cfg := &Config{Listen: "tcp://:18001"} + + ln, _ := cfg.Listener() + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + conn, err := cfg.Dialer() + assert.NoError(t, err) + assert.NotNil(t, conn) + defer func() { + err := conn.Close() + if err != nil { + t.Errorf("error closing the connection: error %v", err) + } + }() + + assert.Equal(t, "tcp", conn.RemoteAddr().Network()) + assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String()) +} + +func TestConfig_DialerUnix(t *testing.T) { + cfg := &Config{Listen: "unix://file.sock"} + + ln, _ := cfg.Listener() + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + conn, err := cfg.Dialer() + assert.NoError(t, err) + assert.NotNil(t, conn) + defer func() { + err := conn.Close() + if err != nil { + t.Errorf("error closing the connection: error %v", err) + } + }() + + assert.Equal(t, "unix", conn.RemoteAddr().Network()) + assert.Equal(t, "file.sock", conn.RemoteAddr().String()) +} + +func Test_Config_DialerError(t *testing.T) { + cfg := &Config{Listen: "uni:unix.sock"} + ln, err := cfg.Dialer() + assert.Nil(t, ln) + assert.Error(t, err) + assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://file.sock)", err.Error()) +} + +func Test_Config_DialerErrorMethod(t *testing.T) { + cfg := &Config{Listen: "xinu://unix.sock"} + + ln, err := cfg.Dialer() + assert.Nil(t, ln) + assert.Error(t, err) +} + +func Test_Config_Defaults(t *testing.T) { + c := &Config{} + c.InitDefaults() + assert.Equal(t, "tcp://127.0.0.1:6001", c.Listen) +} diff --git a/plugins/rpc/plugin_arch.drawio b/plugins/rpc/plugin_arch.drawio new file mode 100644 index 00000000..ee867b37 --- /dev/null +++ b/plugins/rpc/plugin_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2020-10-19T14:58:31.470Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="ikeu7x4QzrcZ5Y9GcAHD" version="13.7.9" type="device"><diagram id="q2oMKs6VHyn7y0AfAXBL" name="Page-1">7Vttc9o4EP41zLQfksE2GPIxQHPXu7RlQntt7ptiC1sX2XJlOUB//a1sGdtIJDQFnE6YyUys1YutfR7trlai44yj5R8cJeEH5mPasbv+suNMOrZtORcO/JOSVSEZWv1CEHDiq0aVYEZ+YCXsKmlGfJw2GgrGqCBJU+ixOMaeaMgQ52zRbDZntPnWBAVYE8w8RHXpV+KLUEkt96Kq+BOTIFSvHtqDoiJCZWM1kzREPlvURM67jjPmjIniKVqOMZXKK/VS9LvaUrv+MI5jsUuHL/zu0yx7//HT3Pln8vfN59vvS/usVHMqVuWMsQ8KUEXGRcgCFiP6rpKOOMtiH8thu1Cq2lwzloDQAuF/WIiVQhNlgoEoFBFVtXhJxLfa860c6ryvSpOlGjkvrMpCLPjqW71Q6yWLVbe8VPabs1hcoYhQKRizjBPMYcIf8UJVqq+8gGKhC6mArTpWohQG8lSrfz88xF8/ds/+uiLe7MsXtLiyZ2clVxEPsHik3WDNBFhCmEUYvh36cUyRIA/N70CKy8G6XQU3PCjEfwZ9q030K8RvazVPoV8BftvA+7dE33KOBP9jX/mAaKbedDOFkbpTmgUk1qjRBH4REoFnCcr1sADj3wT55xVv0PMD5gIvayJdU6rWGSi3otyMYw3OlWRRme21VwlrFtsdHEi9jqbe9zERha+ak0DTL0xVNJWIKAliePZAMaA+ZyQVQsA5XaqKiPh+sShxSn6gu3woiU7CSCzyCfVHnf5EjgXrMC103go+3Q18hho6QwM4pfPcOzg9DZwJTnDspyBk8Rqk8ylnDxCB8N8DLcveD1z2BlxWWa4vpu4x8epreOmuK/YvZcQnIaAoTYm34XeO5kMMun/aFRjdj45QDYG+AYBStrMHUW+YSgpWBOgNtxCgHKJwgapXPercGKhvbwxkbQxUKEYbKCfJetrP542r8aa0vt0U9gsE1rpzKfWVeK97ia+Xc41glolhB1viA32Jj+3O5YhIXc9loAHFEczdpRKWO95Ay/2eyZ1UrqqzQq8S14tkmeurrIanQP0vRvmVQYA052WwVAwHE7+rXrHBp/bCI3f4tPu1jMGReyCwLT06KoLPVPDMExnHmvrSBYkoinGpIVWz07oUcm8y8kJC/Wu0YpmcXiqQd1+WRiHj5AcMi0qIoJqXMNhuo8VM9lQLO1/oeFqiY22IPqBlo+E1SoUSeIxSlKTkbj2NCGwhiUdMCBbt0/k8P47uuQarULapE8Vye4diytDg+ke7R2hAKHaPx4wyIMYkZgWBCKUbopJDFM/FVgalsOEhcXCdt5n0KsmNUoUUMeg7p3kgEoI/wHG+axZIbPUHI9DyWIYl4BnsMZStqpw7iwT22WMWw1wQycHFwKMFTsUvU+Tx1fk0cUr34e7GE/tQBqV0SxpNpJGeYf6QK+VNjMX5TeK9PbGlTbb07ZbZYl1sYUsKTCEeltvAIlKr+aNuSqHqxJw2mTMwBC7HZY6eOSiYMydYni3IeHH8aILnxIk9c8Lq9tomxQ7pCUpyqAszUZ4lWc/iw3qXqQjwOc+8n1kaSRydJI6BEBTdYTqF3WixH57woq1h0/ryueDsGLAOD0UFPeNQ2AcYPmT+G7FK8NvCTMjHkzdply1HdCfmIzhDHvMIR3Av9jDVrKTOjjnUCzPaRzpN1Ra+Ciafk9Xo/nK6wmAsfpMMhrZ+DazZmsHoNTNdPcvgD1xDpmuwB4dgpIX9dLxY8aTKdZ78wp7osn2t/lQyw8SZg3kFPTmqcSZGkTIsgNeJLS2yxZTMOCpb9IizMigcByQFmyITGlYxV4A2o0iqyc+PvOGvYYPmTNbl2Xgzq17Wgdie/Ia1cYFkqO8pHftAx2FGVPUMVVJkul8VLK61cXJl67gc6pTSbAvcVgJ245259TW5Vm5M1k6i9xPlO7uG+b1Ww3zdOVdXCk5h/pHsgtM0C64p7WNywqWz3j8tdsgLX0tXHJ+itiNFbVsu176UIN/SL7xMOQOFR2lOl7a9fN3MP4rYHpbzxq7dsGk/1O1QMzT6nYOAqSAZFqaPvY78hYecQIBjzJGQgbNgsk2UeaH8Ji93RdLvefdY3ohDeZyNlx7G8iGjJMqvA5/pV61fE9YGy93fU6ANxer3NcWNwupXSs67/wE=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go new file mode 100644 index 00000000..eb999b2f --- /dev/null +++ b/plugins/rpc/rpc.go @@ -0,0 +1,175 @@ +package rpc + +import ( + "errors" + + "github.com/spiral/goridge/v2" + "github.com/spiral/roadrunner/v2/plugins/config" + + "net/rpc" +) + +type Plugin interface { + Name() string + RpcService() interface{} +} + +// ID contains default service name. +const ID = "rpc" + +type services struct { + service interface{} + name string +} + +// Service is RPC service. +type Service struct { + // TODO do we need a pointer here since all receivers are pointers?? + rpc *rpc.Server + configProvider config.Provider + services []services + config Config + close chan struct{} +} + +// Init rpc service. Must return true if service is enabled. +func (s *Service) Init(cfg config.Provider) error { + s.configProvider = cfg + s.close = make(chan struct{}) + + return nil +} + +func (s *Service) Configure() error { + err := s.configProvider.UnmarshalKey(ID, &s.config) + if err != nil { + return err + } + + // TODO Do we need to init defaults + if s.config.Listen == "" { + s.config.InitDefaults() + } + + server := rpc.NewServer() + if server == nil { + return errors.New("rpc server is il") + } + s.rpc = server + return nil +} + +// Serve serves the service. +func (s *Service) Serve() chan error { + errCh := make(chan error, 1) + + //s.mu.Lock() + //s.serving = true + //s.stop = make(chan interface{}) + //s.mu.Unlock() + if len(s.services) == 0 { + errCh <- errors.New("no services with RPC") + return errCh + } + + // Attach all services + for i := 0; i < len(s.services); i++ { + err := s.Register(s.services[i].name, s.services[i].service) + if err != nil { + errCh <- err + return errCh + } + } + + ln, err := s.config.Listener() + if err != nil { + errCh <- err + return errCh + } + defer func() { + errCh <- ln.Close() + }() + + go func() { + for { + select { + case <-s.close: + return + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + }() + + // + //s.mu.Lock() + //s.serving = false + //s.mu.Unlock() + + return nil +} + +func (s *Service) Close() error { + s.close <- struct{}{} + return nil +} + +// Stop stops the service. +func (s *Service) Stop() { + //s.mu.Lock() + //defer s.mu.Unlock() + // + //if s.serving { + // close(s.stop) + //} + +} + +func (s *Service) Depends() []interface{} { + return []interface{}{ + s.RpcService, + } +} + +func (s *Service) RpcService(p Plugin) error { + s.services = append(s.services, services{ + service: p.RpcService(), + name: p.Name(), + }) + return nil +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Service) Register(name string, svc interface{}) error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + return s.rpc.RegisterName(name, svc) +} + +// Client creates new RPC client. +func (s *Service) Client() (*rpc.Client, error) { + if s.configProvider == nil { + return nil, errors.New("RPC service is not configured") + } + + conn, err := s.config.Dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} diff --git a/plugins/rpc/rpc_test.go b/plugins/rpc/rpc_test.go new file mode 100644 index 00000000..29d10b0e --- /dev/null +++ b/plugins/rpc/rpc_test.go @@ -0,0 +1,95 @@ +package rpc + +//import ( +// "testing" +// "time" +// +// "github.com/stretchr/testify/assert" +//) +// +//type testService struct{} +// +//func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil } +// +//func Test_Disabled(t *testing.T) { +// s := &Service{} +// ok, err := s.Init(&Config{Enable: false}, service.NewContainer(nil), nil) +// +// assert.NoError(t, err) +// assert.False(t, ok) +//} +// +//func Test_RegisterNotConfigured(t *testing.T) { +// s := &Service{} +// assert.Error(t, s.Register("test", &testService{})) +// +// client, err := s.Client() +// assert.Nil(t, client) +// assert.Error(t, err) +// assert.Error(t, s.Serve()) +//} +// +//func Test_Enabled(t *testing.T) { +// s := &Service{} +// ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil) +// +// assert.NoError(t, err) +// assert.True(t, ok) +//} +// +//func Test_StopNonServing(t *testing.T) { +// s := &Service{} +// ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil) +// +// assert.NoError(t, err) +// assert.True(t, ok) +// s.Stop() +//} +// +//func Test_Serve_Errors(t *testing.T) { +// s := &Service{} +// ok, err := s.Init(&Config{Enable: true, Listen: "malformed"}, service.NewContainer(nil), nil) +// assert.NoError(t, err) +// assert.True(t, ok) +// +// assert.Error(t, s.Serve()) +// +// client, err := s.Client() +// assert.Nil(t, client) +// assert.Error(t, err) +//} +// +//func Test_Serve_Client(t *testing.T) { +// s := &Service{} +// ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), nil) +// assert.NoError(t, err) +// assert.True(t, ok) +// +// defer s.Stop() +// +// assert.NoError(t, s.Register("test", &testService{})) +// +// go func() { assert.NoError(t, s.Serve()) }() +// time.Sleep(time.Second) +// +// client, err := s.Client() +// assert.NotNil(t, client) +// assert.NoError(t, err) +// +// var resp string +// assert.NoError(t, client.Call("test.Echo", "hello world", &resp)) +// assert.Equal(t, "hello world", resp) +// assert.NoError(t, client.Close()) +//} +// +//func TestSetEnv(t *testing.T) { +// s := &Service{} +// e := env.NewService(map[string]string{}) +// ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), e) +// +// assert.NoError(t, err) +// assert.True(t, ok) +// +// v, _ := e.GetEnv() +// assert.Equal(t, "tcp://localhost:9018", v["RR_RPC"]) +//} @@ -48,7 +48,7 @@ type Pool interface { // Exec one task with given payload and context, returns result or error. ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) - // Exec + // Exec Exec(rqs Payload) (Payload, error) // Workers returns worker list associated with the pool. |