summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/config/tests/config_test.go3
-rw-r--r--plugins/config/viper.go7
-rw-r--r--plugins/factory/app.go18
-rw-r--r--plugins/factory/tests/factory_test.go3
-rw-r--r--plugins/rpc/config.go46
-rw-r--r--plugins/rpc/config_test.go137
-rw-r--r--plugins/rpc/doc/plugin_arch.drawio1
-rw-r--r--plugins/rpc/rpc.go157
-rw-r--r--plugins/rpc/rpc_test.go1
9 files changed, 345 insertions, 28 deletions
diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go
index cf5d8489..c85a841f 100644
--- a/plugins/config/tests/config_test.go
+++ b/plugins/config/tests/config_test.go
@@ -40,7 +40,7 @@ func TestViperProvider_Init(t *testing.T) {
}
// stop by CTRL+C
- c := make(chan os.Signal)
+ c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
tt := time.NewTicker(time.Second * 2)
@@ -63,5 +63,4 @@ func TestViperProvider_Init(t *testing.T) {
return
}
}
-
}
diff --git a/plugins/config/viper.go b/plugins/config/viper.go
index b276dbe2..0c34313c 100644
--- a/plugins/config/viper.go
+++ b/plugins/config/viper.go
@@ -14,7 +14,6 @@ type ViperProvider struct {
Prefix string
}
-//////// ENDURE //////////
func (v *ViperProvider) Init() error {
v.viper = viper.New()
@@ -35,8 +34,6 @@ func (v *ViperProvider) Init() error {
return v.viper.ReadInConfig()
}
-///////////// VIPER ///////////////
-
// Overwrite overwrites existing config with provided values
func (v *ViperProvider) Overwrite(values map[string]string) error {
if len(values) != 0 {
@@ -71,8 +68,6 @@ func (v *ViperProvider) Has(name string) bool {
return v.viper.IsSet(name)
}
-/////////// PRIVATE //////////////
-
func parseFlag(flag string) (string, string, error) {
if !strings.Contains(flag, "=") {
return "", "", fmt.Errorf("invalid flag `%s`", flag)
@@ -88,7 +83,7 @@ func parseValue(value string) string {
if escape == '"' || escape == '\'' || escape == '`' {
value = strings.Trim(value, string(escape))
- value = strings.Replace(value, fmt.Sprintf("\\%s", string(escape)), string(escape), -1)
+ value = strings.ReplaceAll(value, fmt.Sprintf("\\%s", string(escape)), string(escape))
}
return value
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
index 753ca2a9..e4002963 100644
--- a/plugins/factory/app.go
+++ b/plugins/factory/app.go
@@ -33,17 +33,12 @@ type AppConfig struct {
type App struct {
cfg AppConfig
configProvider config.Provider
- factory roadrunner.Factory
}
func (app *App) Init(provider config.Provider) error {
app.cfg = AppConfig{}
app.configProvider = provider
- return nil
-}
-
-func (app *App) Configure() error {
err := app.configProvider.UnmarshalKey("app", &app.cfg)
if err != nil {
return err
@@ -56,10 +51,6 @@ func (app *App) Configure() error {
return nil
}
-func (app *App) Close() error {
- return nil
-}
-
func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
var cmdArgs []string
// create command according to the config
@@ -111,15 +102,6 @@ func (app *App) NewFactory(env Env) (roadrunner.Factory, error) {
}
}
-func (app *App) Serve() chan error {
- errCh := make(chan error)
- return errCh
-}
-
-func (app *App) Stop() error {
- return nil
-}
-
func (app *App) setEnv(e Env) []string {
env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay))
for k, v := range e {
diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go
index 72e28f84..5347083a 100644
--- a/plugins/factory/tests/factory_test.go
+++ b/plugins/factory/tests/factory_test.go
@@ -57,7 +57,7 @@ func TestFactory(t *testing.T) {
}
// stop by CTRL+C
- c := make(chan os.Signal)
+ c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
tt := time.NewTicker(time.Second * 2)
@@ -80,5 +80,4 @@ func TestFactory(t *testing.T) {
return
}
}
-
}
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go
new file mode 100644
index 00000000..1039ee5e
--- /dev/null
+++ b/plugins/rpc/config.go
@@ -0,0 +1,46 @@
+package rpc
+
+import (
+ "errors"
+ "net"
+ "strings"
+
+ "github.com/spiral/roadrunner/v2/util"
+)
+
+// Config defines RPC service config.
+type Config struct {
+ // Listen string
+ Listen string
+}
+
+// InitDefaults allows to init blank config with pre-defined set of default values.
+func (c *Config) InitDefaults() {
+ if c.Listen == "" {
+ c.Listen = "tcp://127.0.0.1:6001"
+ }
+}
+
+// Valid returns nil if config is valid.
+func (c *Config) Valid() error {
+ if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 {
+ return errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)")
+ }
+
+ return nil
+}
+
+// Listener creates new rpc socket Listener.
+func (c *Config) Listener() (net.Listener, error) {
+ return util.CreateListener(c.Listen)
+}
+
+// Dialer creates rpc socket Dialer.
+func (c *Config) Dialer() (net.Conn, error) {
+ dsn := strings.Split(c.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)")
+ }
+
+ return net.Dial(dsn[0], dsn[1])
+}
diff --git a/plugins/rpc/config_test.go b/plugins/rpc/config_test.go
new file mode 100644
index 00000000..36927dd2
--- /dev/null
+++ b/plugins/rpc/config_test.go
@@ -0,0 +1,137 @@
+package rpc
+
+import (
+ "testing"
+
+ json "github.com/json-iterator/go"
+ "github.com/stretchr/testify/assert"
+)
+
+type testCfg struct{ cfg string }
+
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ j := json.ConfigCompatibleWithStandardLibrary
+ return j.Unmarshal([]byte(cfg.cfg), out)
+}
+
+func TestConfig_Listener(t *testing.T) {
+ cfg := &Config{Listen: "tcp://:18001"}
+
+ ln, err := cfg.Listener()
+ assert.NoError(t, err)
+ assert.NotNil(t, ln)
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+
+ assert.Equal(t, "tcp", ln.Addr().Network())
+ assert.Equal(t, "0.0.0.0:18001", ln.Addr().String())
+}
+
+func TestConfig_ListenerUnix(t *testing.T) {
+ cfg := &Config{Listen: "unix://file.sock"}
+
+ ln, err := cfg.Listener()
+ assert.NoError(t, err)
+ assert.NotNil(t, ln)
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+
+ assert.Equal(t, "unix", ln.Addr().Network())
+ assert.Equal(t, "file.sock", ln.Addr().String())
+}
+
+func Test_Config_Error(t *testing.T) {
+ cfg := &Config{Listen: "uni:unix.sock"}
+ ln, err := cfg.Listener()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid DSN (tcp://:6001, unix://file.sock)", err.Error())
+}
+
+func Test_Config_ErrorMethod(t *testing.T) {
+ cfg := &Config{Listen: "xinu://unix.sock"}
+
+ ln, err := cfg.Listener()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+}
+
+func TestConfig_Dialer(t *testing.T) {
+ cfg := &Config{Listen: "tcp://:18001"}
+
+ ln, _ := cfg.Listener()
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+
+ conn, err := cfg.Dialer()
+ assert.NoError(t, err)
+ assert.NotNil(t, conn)
+ defer func() {
+ err := conn.Close()
+ if err != nil {
+ t.Errorf("error closing the connection: error %v", err)
+ }
+ }()
+
+ assert.Equal(t, "tcp", conn.RemoteAddr().Network())
+ assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String())
+}
+
+func TestConfig_DialerUnix(t *testing.T) {
+ cfg := &Config{Listen: "unix://file.sock"}
+
+ ln, _ := cfg.Listener()
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+
+ conn, err := cfg.Dialer()
+ assert.NoError(t, err)
+ assert.NotNil(t, conn)
+ defer func() {
+ err := conn.Close()
+ if err != nil {
+ t.Errorf("error closing the connection: error %v", err)
+ }
+ }()
+
+ assert.Equal(t, "unix", conn.RemoteAddr().Network())
+ assert.Equal(t, "file.sock", conn.RemoteAddr().String())
+}
+
+func Test_Config_DialerError(t *testing.T) {
+ cfg := &Config{Listen: "uni:unix.sock"}
+ ln, err := cfg.Dialer()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+ assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://file.sock)", err.Error())
+}
+
+func Test_Config_DialerErrorMethod(t *testing.T) {
+ cfg := &Config{Listen: "xinu://unix.sock"}
+
+ ln, err := cfg.Dialer()
+ assert.Nil(t, ln)
+ assert.Error(t, err)
+}
+
+func Test_Config_Defaults(t *testing.T) {
+ c := &Config{}
+ c.InitDefaults()
+ assert.Equal(t, "tcp://127.0.0.1:6001", c.Listen)
+}
diff --git a/plugins/rpc/doc/plugin_arch.drawio b/plugins/rpc/doc/plugin_arch.drawio
new file mode 100644
index 00000000..dec5f0b2
--- /dev/null
+++ b/plugins/rpc/doc/plugin_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2020-10-19T17:14:19.125Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="2J39x4EyFr1zaE9BXKM4" version="13.7.9" type="device"><diagram id="q2oMKs6VHyn7y0AfAXBL" name="Page-1">7Vttc9o4EP41zLQfksE2GPIxQHPXu7RlQntt7ptiC1sX2XJlOUB//a1sGdtIJDQFnE6YyUys1YutfR7trlai44yj5R8cJeEH5mPasbv+suNMOrZtORcO/JOSVSEZWv1CEHDiq0aVYEZ+YCXsKmlGfJw2GgrGqCBJU+ixOMaeaMgQ52zRbDZntPnWBAVYE8w8RHXpV+KLUEkt96Kq+BOTIFSvHtqDoiJCZWM1kzREPlvURM67jjPmjIniKVqOMZXKK/VS9LvaUrv+MI5jsUuHL/zu0yx7//HT3Pln8vfN59vvS/usVHMqVuWMsQ8KUEXGRcgCFiP6rpKOOMtiH8thu1Cq2lwzloDQAuF/WIiVQhNlgoEoFBFVtXhJxLfa860c6ryvSpOlGjkvrMpCLPjqW71Q6yWLVbe8VPabs1hcoYhQKRizjBPMYcIf8UJVqq+8gGKhC6mArTpWohQG8lSrfz88xF8/ds/+uiLe7MsXtLiyZ2clVxEPsHik3WDNBFhCmEUYvh36cUyRIA/N70CKy8G6XQU3PCjEfwZ9q030K8RvazVPoV8BftvA+7dE33KOBP9jX/mAaKbedDOFkbpTmgUk1qjRBH4REoFnCcr1sADj3wT55xVv0PMD5gIvayJdU6rWGSi3otyMYw3OlWRRme21VwlrFtsdHEi9jqbe9zERha+ak0DTL0xVNJWIKAliePZAMaA+ZyQVQsA5XaqKiPh+sShxSn6gu3woiU7CSCzyCfVHnf5EjgXrMC103go+3Q18hho6QwM4pfPcOzg9DZwJTnDspyBk8Rqk8ylnDxCB8N8DLcveD1z2BlxWWa4vpu4x8epreOmuK/YvZcQnIaAoTYm34XeO5kMMun/aFRjdj45QDYG+AYBStrMHUW+YSgpWBOgNtxCgHKJwgapXPercGKhvbwxkbQxUKEYbKCfJetrP542r8aa0vt0U9gsE1rpzKfWVeK97ia+Xc41glolhB1viA32Jj+3O5YhIXc9loAHFEczdpRKWO95Ay/2eyZ1UrqqzQq8S14tkmeurrIanQP0vRvmVQYA052WwVAwHE7+rXrHBp/bCI3f4tPu1jMGReyCwLT06KoLPVPDMExnHmvrSBYkoinGpIVWz07oUcm8y8kJC/Wu0YpmcXiqQd1+WRiHj5AcMi0qIoJqXMNhuo8VM9lQLO1/oeFqiY22IPqBlo+E1SoUSeIxSlKTkbj2NCGwhiUdMCBbt0/k8P47uuQarULapE8Vye4diytDg+ke7R2hAKHaPx4wyIMYkZgWBCKUbopJDFM/FVgalsOEhcXCdt5n0KsmNUoUUMeg7p3kgEoI/wHG+axZIbPUHI9DyWIYl4BnsMZStqpw7iwT22WMWw1wQycHFwKMFTsUvU+Tx1fk0cUr34e7GE/tQBqV0SxpNpJGeYf6QK+VNjMX5TeK9PbGlTbb07ZbZYl1sYUsKTCEeltvAIlKr+aNuSqHqxJw2mTMwBC7HZY6eOSiYMydYni3IeHH8aILnxIk9c8Lq9tomxQ7pCUpyqAszUZ4lWc/iw3qXqQjwOc+8n1kaSRydJI6BEBTdYTqF3WixH57woq1h0/ryueDsGLAOD0UFPeNQ2AcYPmT+G7FK8NvCTMjHkzdply1HdCfmIzhDHvMIR3Av9jDVrKTOjjnUCzPaRzpN1Ra+Ciafk9Xo/nK6wmAsfpMMhrZ+DazZmsHoNTNdPcvgD1xDpmuwB4dgpIX9dLxY8aTKdZ78wp7osn2t/lQyw8SZg3kFPTmqcSZGkTIsgNeJLS2yxZTMOCpb9IizMigcByQFmyITGlYxV4A2o0iqyc+PvOGvYYPmTNbl2Xgzq17Wgdie/Ia1cYFkqO8pHftAx2FGVPUMVVJkul8VLK61cXJl67gc6pTSbAvcVgJ245259TW5Vm5M1k6i9xPlO7uG+b1Ww3zdOVdXCk5h/pHsgtM0C64p7WNywqWz3j8tdsgLX0tXHJ+itiNFbVsu176UIN/SL7xMOQOFR2lOl7a9fN3MP4rYHpbzxq7dsGk/1O1QMzT6nYOAqSAZFqaPvY78hYecQIBjzJGQgbNgsk2UeaH8Ji93RdLvefdY3ohDeZyNlx7G8iGjJMqvA5/pV61fE9YGy93fU6ANxer3NcWNwupXSs67/wE=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go
new file mode 100644
index 00000000..6568eea3
--- /dev/null
+++ b/plugins/rpc/rpc.go
@@ -0,0 +1,157 @@
+package rpc
+
+import (
+ "errors"
+
+ "github.com/spiral/goridge/v2"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+
+ "net/rpc"
+)
+
+type PluginRpc interface {
+ Name() string
+ RpcService() (interface{}, error)
+}
+
+// ID contains default service name.
+const ID = "rpc"
+
+type services struct {
+ service interface{}
+ name string
+}
+
+// Service is RPC service.
+type Service struct {
+ // TODO do we need a pointer here since all receivers are pointers??
+ rpc *rpc.Server
+ configProvider config.Provider
+ services []services
+ config Config
+ close chan struct{}
+}
+
+// Init rpc service. Must return true if service is enabled.
+func (s *Service) Init(cfg config.Provider) error {
+ s.configProvider = cfg
+ err := s.configProvider.UnmarshalKey(ID, &s.config)
+ if err != nil {
+ return err
+ }
+
+ // TODO Do we need to init defaults
+ if s.config.Listen == "" {
+ s.config.InitDefaults()
+ }
+
+ s.close = make(chan struct{})
+
+ return nil
+}
+
+// Serve serves the service.
+func (s *Service) Serve() chan error {
+ errCh := make(chan error, 1)
+ server := rpc.NewServer()
+ if server == nil {
+ errCh <- errors.New("rpc server is nil")
+ return errCh
+ }
+ s.rpc = server
+
+ if len(s.services) == 0 {
+ errCh <- errors.New("no services with RPC")
+ return errCh
+ }
+
+ // Attach all services
+ for i := 0; i < len(s.services); i++ {
+ err := s.Register(s.services[i].name, s.services[i].service)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ }
+
+ ln, err := s.config.Listener()
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ go func() {
+ for {
+ select {
+ case <-s.close:
+ // log error
+ errCh <- ln.Close()
+ return
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+ }()
+
+ return nil
+}
+
+// Stop stops the service.
+func (s *Service) Stop() error {
+ s.close <- struct{}{}
+ return nil
+}
+
+func (s *Service) Depends() []interface{} {
+ return []interface{}{
+ s.RpcService,
+ }
+}
+
+func (s *Service) RpcService(p PluginRpc) error {
+ service, err := p.RpcService()
+ if err != nil {
+ return err
+ }
+
+ s.services = append(s.services, services{
+ service: service,
+ name: p.Name(),
+ })
+ return nil
+}
+
+// Register publishes in the server the set of methods of the
+// receiver value that satisfy the following conditions:
+// - exported method of exported type
+// - two arguments, both of exported type
+// - the second argument is a pointer
+// - one return value, of type error
+// It returns an error if the receiver is not an exported type or has
+// no suitable methods. It also logs the error using package log.
+func (s *Service) Register(name string, svc interface{}) error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ return s.rpc.RegisterName(name, svc)
+}
+
+// Client creates new RPC client.
+func (s *Service) Client() (*rpc.Client, error) {
+ if s.configProvider == nil {
+ return nil, errors.New("RPC service is not configured")
+ }
+
+ conn, err := s.config.Dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}
diff --git a/plugins/rpc/rpc_test.go b/plugins/rpc/rpc_test.go
new file mode 100644
index 00000000..9ab1e3e8
--- /dev/null
+++ b/plugins/rpc/rpc_test.go
@@ -0,0 +1 @@
+package rpc