diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/config/tests/config_test.go | 3 | ||||
-rw-r--r-- | plugins/config/viper.go | 7 | ||||
-rw-r--r-- | plugins/factory/app.go | 18 | ||||
-rw-r--r-- | plugins/factory/tests/factory_test.go | 3 | ||||
-rw-r--r-- | plugins/rpc/config.go | 46 | ||||
-rw-r--r-- | plugins/rpc/config_test.go | 137 | ||||
-rw-r--r-- | plugins/rpc/doc/plugin_arch.drawio | 1 | ||||
-rw-r--r-- | plugins/rpc/rpc.go | 157 | ||||
-rw-r--r-- | plugins/rpc/rpc_test.go | 1 |
9 files changed, 345 insertions, 28 deletions
diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go index cf5d8489..c85a841f 100644 --- a/plugins/config/tests/config_test.go +++ b/plugins/config/tests/config_test.go @@ -40,7 +40,7 @@ func TestViperProvider_Init(t *testing.T) { } // stop by CTRL+C - c := make(chan os.Signal) + c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) tt := time.NewTicker(time.Second * 2) @@ -63,5 +63,4 @@ func TestViperProvider_Init(t *testing.T) { return } } - } diff --git a/plugins/config/viper.go b/plugins/config/viper.go index b276dbe2..0c34313c 100644 --- a/plugins/config/viper.go +++ b/plugins/config/viper.go @@ -14,7 +14,6 @@ type ViperProvider struct { Prefix string } -//////// ENDURE ////////// func (v *ViperProvider) Init() error { v.viper = viper.New() @@ -35,8 +34,6 @@ func (v *ViperProvider) Init() error { return v.viper.ReadInConfig() } -///////////// VIPER /////////////// - // Overwrite overwrites existing config with provided values func (v *ViperProvider) Overwrite(values map[string]string) error { if len(values) != 0 { @@ -71,8 +68,6 @@ func (v *ViperProvider) Has(name string) bool { return v.viper.IsSet(name) } -/////////// PRIVATE ////////////// - func parseFlag(flag string) (string, string, error) { if !strings.Contains(flag, "=") { return "", "", fmt.Errorf("invalid flag `%s`", flag) @@ -88,7 +83,7 @@ func parseValue(value string) string { if escape == '"' || escape == '\'' || escape == '`' { value = strings.Trim(value, string(escape)) - value = strings.Replace(value, fmt.Sprintf("\\%s", string(escape)), string(escape), -1) + value = strings.ReplaceAll(value, fmt.Sprintf("\\%s", string(escape)), string(escape)) } return value diff --git a/plugins/factory/app.go b/plugins/factory/app.go index 753ca2a9..e4002963 100644 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -33,17 +33,12 @@ type AppConfig struct { type App struct { cfg AppConfig configProvider config.Provider - factory roadrunner.Factory } func (app *App) Init(provider config.Provider) error { app.cfg = AppConfig{} app.configProvider = provider - return nil -} - -func (app *App) Configure() error { err := app.configProvider.UnmarshalKey("app", &app.cfg) if err != nil { return err @@ -56,10 +51,6 @@ func (app *App) Configure() error { return nil } -func (app *App) Close() error { - return nil -} - func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { var cmdArgs []string // create command according to the config @@ -111,15 +102,6 @@ func (app *App) NewFactory(env Env) (roadrunner.Factory, error) { } } -func (app *App) Serve() chan error { - errCh := make(chan error) - return errCh -} - -func (app *App) Stop() error { - return nil -} - func (app *App) setEnv(e Env) []string { env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) for k, v := range e { diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go index 72e28f84..5347083a 100644 --- a/plugins/factory/tests/factory_test.go +++ b/plugins/factory/tests/factory_test.go @@ -57,7 +57,7 @@ func TestFactory(t *testing.T) { } // stop by CTRL+C - c := make(chan os.Signal) + c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) tt := time.NewTicker(time.Second * 2) @@ -80,5 +80,4 @@ func TestFactory(t *testing.T) { return } } - } diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go new file mode 100644 index 00000000..1039ee5e --- /dev/null +++ b/plugins/rpc/config.go @@ -0,0 +1,46 @@ +package rpc + +import ( + "errors" + "net" + "strings" + + "github.com/spiral/roadrunner/v2/util" +) + +// Config defines RPC service config. +type Config struct { + // Listen string + Listen string +} + +// InitDefaults allows to init blank config with pre-defined set of default values. +func (c *Config) InitDefaults() { + if c.Listen == "" { + c.Listen = "tcp://127.0.0.1:6001" + } +} + +// Valid returns nil if config is valid. +func (c *Config) Valid() error { + if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { + return errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") + } + + return nil +} + +// Listener creates new rpc socket Listener. +func (c *Config) Listener() (net.Listener, error) { + return util.CreateListener(c.Listen) +} + +// Dialer creates rpc socket Dialer. +func (c *Config) Dialer() (net.Conn, error) { + dsn := strings.Split(c.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") + } + + return net.Dial(dsn[0], dsn[1]) +} diff --git a/plugins/rpc/config_test.go b/plugins/rpc/config_test.go new file mode 100644 index 00000000..36927dd2 --- /dev/null +++ b/plugins/rpc/config_test.go @@ -0,0 +1,137 @@ +package rpc + +import ( + "testing" + + json "github.com/json-iterator/go" + "github.com/stretchr/testify/assert" +) + +type testCfg struct{ cfg string } + +func (cfg *testCfg) Unmarshal(out interface{}) error { + j := json.ConfigCompatibleWithStandardLibrary + return j.Unmarshal([]byte(cfg.cfg), out) +} + +func TestConfig_Listener(t *testing.T) { + cfg := &Config{Listen: "tcp://:18001"} + + ln, err := cfg.Listener() + assert.NoError(t, err) + assert.NotNil(t, ln) + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + assert.Equal(t, "tcp", ln.Addr().Network()) + assert.Equal(t, "0.0.0.0:18001", ln.Addr().String()) +} + +func TestConfig_ListenerUnix(t *testing.T) { + cfg := &Config{Listen: "unix://file.sock"} + + ln, err := cfg.Listener() + assert.NoError(t, err) + assert.NotNil(t, ln) + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + assert.Equal(t, "unix", ln.Addr().Network()) + assert.Equal(t, "file.sock", ln.Addr().String()) +} + +func Test_Config_Error(t *testing.T) { + cfg := &Config{Listen: "uni:unix.sock"} + ln, err := cfg.Listener() + assert.Nil(t, ln) + assert.Error(t, err) + assert.Equal(t, "invalid DSN (tcp://:6001, unix://file.sock)", err.Error()) +} + +func Test_Config_ErrorMethod(t *testing.T) { + cfg := &Config{Listen: "xinu://unix.sock"} + + ln, err := cfg.Listener() + assert.Nil(t, ln) + assert.Error(t, err) +} + +func TestConfig_Dialer(t *testing.T) { + cfg := &Config{Listen: "tcp://:18001"} + + ln, _ := cfg.Listener() + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + conn, err := cfg.Dialer() + assert.NoError(t, err) + assert.NotNil(t, conn) + defer func() { + err := conn.Close() + if err != nil { + t.Errorf("error closing the connection: error %v", err) + } + }() + + assert.Equal(t, "tcp", conn.RemoteAddr().Network()) + assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String()) +} + +func TestConfig_DialerUnix(t *testing.T) { + cfg := &Config{Listen: "unix://file.sock"} + + ln, _ := cfg.Listener() + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + conn, err := cfg.Dialer() + assert.NoError(t, err) + assert.NotNil(t, conn) + defer func() { + err := conn.Close() + if err != nil { + t.Errorf("error closing the connection: error %v", err) + } + }() + + assert.Equal(t, "unix", conn.RemoteAddr().Network()) + assert.Equal(t, "file.sock", conn.RemoteAddr().String()) +} + +func Test_Config_DialerError(t *testing.T) { + cfg := &Config{Listen: "uni:unix.sock"} + ln, err := cfg.Dialer() + assert.Nil(t, ln) + assert.Error(t, err) + assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://file.sock)", err.Error()) +} + +func Test_Config_DialerErrorMethod(t *testing.T) { + cfg := &Config{Listen: "xinu://unix.sock"} + + ln, err := cfg.Dialer() + assert.Nil(t, ln) + assert.Error(t, err) +} + +func Test_Config_Defaults(t *testing.T) { + c := &Config{} + c.InitDefaults() + assert.Equal(t, "tcp://127.0.0.1:6001", c.Listen) +} diff --git a/plugins/rpc/doc/plugin_arch.drawio b/plugins/rpc/doc/plugin_arch.drawio new file mode 100644 index 00000000..dec5f0b2 --- /dev/null +++ b/plugins/rpc/doc/plugin_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2020-10-19T17:14:19.125Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="2J39x4EyFr1zaE9BXKM4" version="13.7.9" type="device"><diagram id="q2oMKs6VHyn7y0AfAXBL" name="Page-1">7Vttc9o4EP41zLQfksE2GPIxQHPXu7RlQntt7ptiC1sX2XJlOUB//a1sGdtIJDQFnE6YyUys1YutfR7trlai44yj5R8cJeEH5mPasbv+suNMOrZtORcO/JOSVSEZWv1CEHDiq0aVYEZ+YCXsKmlGfJw2GgrGqCBJU+ixOMaeaMgQ52zRbDZntPnWBAVYE8w8RHXpV+KLUEkt96Kq+BOTIFSvHtqDoiJCZWM1kzREPlvURM67jjPmjIniKVqOMZXKK/VS9LvaUrv+MI5jsUuHL/zu0yx7//HT3Pln8vfN59vvS/usVHMqVuWMsQ8KUEXGRcgCFiP6rpKOOMtiH8thu1Cq2lwzloDQAuF/WIiVQhNlgoEoFBFVtXhJxLfa860c6ryvSpOlGjkvrMpCLPjqW71Q6yWLVbe8VPabs1hcoYhQKRizjBPMYcIf8UJVqq+8gGKhC6mArTpWohQG8lSrfz88xF8/ds/+uiLe7MsXtLiyZ2clVxEPsHik3WDNBFhCmEUYvh36cUyRIA/N70CKy8G6XQU3PCjEfwZ9q030K8RvazVPoV8BftvA+7dE33KOBP9jX/mAaKbedDOFkbpTmgUk1qjRBH4REoFnCcr1sADj3wT55xVv0PMD5gIvayJdU6rWGSi3otyMYw3OlWRRme21VwlrFtsdHEi9jqbe9zERha+ak0DTL0xVNJWIKAliePZAMaA+ZyQVQsA5XaqKiPh+sShxSn6gu3woiU7CSCzyCfVHnf5EjgXrMC103go+3Q18hho6QwM4pfPcOzg9DZwJTnDspyBk8Rqk8ylnDxCB8N8DLcveD1z2BlxWWa4vpu4x8epreOmuK/YvZcQnIaAoTYm34XeO5kMMun/aFRjdj45QDYG+AYBStrMHUW+YSgpWBOgNtxCgHKJwgapXPercGKhvbwxkbQxUKEYbKCfJetrP542r8aa0vt0U9gsE1rpzKfWVeK97ia+Xc41glolhB1viA32Jj+3O5YhIXc9loAHFEczdpRKWO95Ay/2eyZ1UrqqzQq8S14tkmeurrIanQP0vRvmVQYA052WwVAwHE7+rXrHBp/bCI3f4tPu1jMGReyCwLT06KoLPVPDMExnHmvrSBYkoinGpIVWz07oUcm8y8kJC/Wu0YpmcXiqQd1+WRiHj5AcMi0qIoJqXMNhuo8VM9lQLO1/oeFqiY22IPqBlo+E1SoUSeIxSlKTkbj2NCGwhiUdMCBbt0/k8P47uuQarULapE8Vye4diytDg+ke7R2hAKHaPx4wyIMYkZgWBCKUbopJDFM/FVgalsOEhcXCdt5n0KsmNUoUUMeg7p3kgEoI/wHG+axZIbPUHI9DyWIYl4BnsMZStqpw7iwT22WMWw1wQycHFwKMFTsUvU+Tx1fk0cUr34e7GE/tQBqV0SxpNpJGeYf6QK+VNjMX5TeK9PbGlTbb07ZbZYl1sYUsKTCEeltvAIlKr+aNuSqHqxJw2mTMwBC7HZY6eOSiYMydYni3IeHH8aILnxIk9c8Lq9tomxQ7pCUpyqAszUZ4lWc/iw3qXqQjwOc+8n1kaSRydJI6BEBTdYTqF3WixH57woq1h0/ryueDsGLAOD0UFPeNQ2AcYPmT+G7FK8NvCTMjHkzdply1HdCfmIzhDHvMIR3Av9jDVrKTOjjnUCzPaRzpN1Ra+Ciafk9Xo/nK6wmAsfpMMhrZ+DazZmsHoNTNdPcvgD1xDpmuwB4dgpIX9dLxY8aTKdZ78wp7osn2t/lQyw8SZg3kFPTmqcSZGkTIsgNeJLS2yxZTMOCpb9IizMigcByQFmyITGlYxV4A2o0iqyc+PvOGvYYPmTNbl2Xgzq17Wgdie/Ia1cYFkqO8pHftAx2FGVPUMVVJkul8VLK61cXJl67gc6pTSbAvcVgJ245259TW5Vm5M1k6i9xPlO7uG+b1Ww3zdOVdXCk5h/pHsgtM0C64p7WNywqWz3j8tdsgLX0tXHJ+itiNFbVsu176UIN/SL7xMOQOFR2lOl7a9fN3MP4rYHpbzxq7dsGk/1O1QMzT6nYOAqSAZFqaPvY78hYecQIBjzJGQgbNgsk2UeaH8Ji93RdLvefdY3ohDeZyNlx7G8iGjJMqvA5/pV61fE9YGy93fU6ANxer3NcWNwupXSs67/wE=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go new file mode 100644 index 00000000..6568eea3 --- /dev/null +++ b/plugins/rpc/rpc.go @@ -0,0 +1,157 @@ +package rpc + +import ( + "errors" + + "github.com/spiral/goridge/v2" + "github.com/spiral/roadrunner/v2/plugins/config" + + "net/rpc" +) + +type PluginRpc interface { + Name() string + RpcService() (interface{}, error) +} + +// ID contains default service name. +const ID = "rpc" + +type services struct { + service interface{} + name string +} + +// Service is RPC service. +type Service struct { + // TODO do we need a pointer here since all receivers are pointers?? + rpc *rpc.Server + configProvider config.Provider + services []services + config Config + close chan struct{} +} + +// Init rpc service. Must return true if service is enabled. +func (s *Service) Init(cfg config.Provider) error { + s.configProvider = cfg + err := s.configProvider.UnmarshalKey(ID, &s.config) + if err != nil { + return err + } + + // TODO Do we need to init defaults + if s.config.Listen == "" { + s.config.InitDefaults() + } + + s.close = make(chan struct{}) + + return nil +} + +// Serve serves the service. +func (s *Service) Serve() chan error { + errCh := make(chan error, 1) + server := rpc.NewServer() + if server == nil { + errCh <- errors.New("rpc server is nil") + return errCh + } + s.rpc = server + + if len(s.services) == 0 { + errCh <- errors.New("no services with RPC") + return errCh + } + + // Attach all services + for i := 0; i < len(s.services); i++ { + err := s.Register(s.services[i].name, s.services[i].service) + if err != nil { + errCh <- err + return errCh + } + } + + ln, err := s.config.Listener() + if err != nil { + errCh <- err + return errCh + } + + go func() { + for { + select { + case <-s.close: + // log error + errCh <- ln.Close() + return + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + }() + + return nil +} + +// Stop stops the service. +func (s *Service) Stop() error { + s.close <- struct{}{} + return nil +} + +func (s *Service) Depends() []interface{} { + return []interface{}{ + s.RpcService, + } +} + +func (s *Service) RpcService(p PluginRpc) error { + service, err := p.RpcService() + if err != nil { + return err + } + + s.services = append(s.services, services{ + service: service, + name: p.Name(), + }) + return nil +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Service) Register(name string, svc interface{}) error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + return s.rpc.RegisterName(name, svc) +} + +// Client creates new RPC client. +func (s *Service) Client() (*rpc.Client, error) { + if s.configProvider == nil { + return nil, errors.New("RPC service is not configured") + } + + conn, err := s.config.Dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} diff --git a/plugins/rpc/rpc_test.go b/plugins/rpc/rpc_test.go new file mode 100644 index 00000000..9ab1e3e8 --- /dev/null +++ b/plugins/rpc/rpc_test.go @@ -0,0 +1 @@ +package rpc |