summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-05 17:58:23 +0300
committerGitHub <[email protected]>2020-11-05 17:58:23 +0300
commit9fbe7726dd55cfedda724b7644e1b6bf7c1a6cb4 (patch)
treef2bf9b97d38103de51e2d140aa76666f9c6341c8 /plugins
parentedc45b3e24afdb5e56e74ffbbbd50e0e3b04922b (diff)
parent73da7300fcc9b8b22faa1c91fc1faff22ab944ff (diff)
Merge pull request #388 from spiral/enhancement/testsv2.0.0-alpha15
Tests for the new plugins
Diffstat (limited to 'plugins')
-rw-r--r--plugins/app/plugin.go (renamed from plugins/app/app.go)33
-rw-r--r--plugins/app/tests/app_test.go358
-rw-r--r--plugins/app/tests/configs/.rr-no-app-section.yaml9
-rw-r--r--plugins/app/tests/configs/.rr-sockets.yaml9
-rw-r--r--plugins/app/tests/configs/.rr-tcp.yaml9
-rw-r--r--plugins/app/tests/configs/.rr-wrong-command.yaml (renamed from plugins/app/tests/.rr.yaml)4
-rw-r--r--plugins/app/tests/configs/.rr-wrong-relay.yaml9
-rw-r--r--plugins/app/tests/configs/.rr.yaml9
-rw-r--r--plugins/app/tests/hello.php1
-rw-r--r--plugins/app/tests/plugin_1.go55
-rw-r--r--plugins/app/tests/plugin_2.go88
-rw-r--r--plugins/app/tests/plugin_pipes.go130
-rw-r--r--plugins/app/tests/plugin_sockets.go111
-rw-r--r--plugins/app/tests/plugin_tcp.go111
-rw-r--r--plugins/app/tests/socket.php25
-rw-r--r--plugins/app/tests/tcp.php20
-rwxr-xr-xplugins/config/configurer.go (renamed from plugins/config/provider.go)4
-rwxr-xr-xplugins/config/plugin.go (renamed from plugins/config/viper.go)12
-rwxr-xr-xplugins/config/tests/.rr.yaml10
-rwxr-xr-xplugins/config/tests/config_test.go4
-rwxr-xr-xplugins/config/tests/plugin1.go4
-rw-r--r--plugins/logger/encoder.go2
-rw-r--r--plugins/logger/plugin.go (renamed from plugins/logger/zap_logger.go)31
-rw-r--r--plugins/logger/tests/.rr.yaml0
-rw-r--r--plugins/logger/tests/logger_test.go (renamed from plugins/app/tests/factory_test.go)20
-rw-r--r--plugins/logger/tests/plugin.go40
-rwxr-xr-xplugins/rpc/plugin.go (renamed from plugins/rpc/rpc.go)92
-rw-r--r--plugins/rpc/tests/.rr-rpc-disabled.yaml3
-rw-r--r--plugins/rpc/tests/.rr.yaml3
-rw-r--r--plugins/rpc/tests/plugin1.go42
-rw-r--r--plugins/rpc/tests/plugin2.go54
-rw-r--r--plugins/rpc/tests/rpc_test.go169
32 files changed, 1215 insertions, 256 deletions
diff --git a/plugins/app/app.go b/plugins/app/plugin.go
index 312e5bc6..839685bd 100644
--- a/plugins/app/app.go
+++ b/plugins/app/plugin.go
@@ -7,10 +7,9 @@ import (
"os/exec"
"strings"
- "go.uber.org/zap"
-
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/log"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
)
@@ -26,15 +25,15 @@ type WorkerFactory interface {
NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error)
}
-// App manages worker
-type App struct {
+// Plugin manages worker
+type Plugin struct {
cfg Config
- log *zap.Logger
+ log log.Logger
factory roadrunner.Factory
}
// Init application provider.
-func (app *App) Init(cfg config.Provider, log *zap.Logger) error {
+func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error {
err := cfg.UnmarshalKey(ServiceName, &app.cfg)
if err != nil {
return err
@@ -46,11 +45,11 @@ func (app *App) Init(cfg config.Provider, log *zap.Logger) error {
}
// Name contains service name.
-func (app *App) Name() string {
+func (app *Plugin) Name() string {
return ServiceName
}
-func (app *App) Serve() chan error {
+func (app *Plugin) Serve() chan error {
errCh := make(chan error, 1)
var err error
@@ -62,7 +61,7 @@ func (app *App) Serve() chan error {
return errCh
}
-func (app *App) Stop() error {
+func (app *Plugin) Stop() error {
if app.factory == nil {
return nil
}
@@ -71,7 +70,7 @@ func (app *App) Stop() error {
}
// CmdFactory provides worker command factory assocated with given context.
-func (app *App) CmdFactory(env Env) (func() *exec.Cmd, error) {
+func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
var cmdArgs []string
// create command according to the config
@@ -97,7 +96,7 @@ func (app *App) CmdFactory(env Env) (func() *exec.Cmd, error) {
}
// NewWorker issues new standalone worker.
-func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
spawnCmd, err := app.CmdFactory(env)
if err != nil {
return nil, err
@@ -114,7 +113,7 @@ func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase,
}
// NewWorkerPool issues new worker pool.
-func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) {
+func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) {
spawnCmd, err := app.CmdFactory(env)
if err != nil {
return nil, err
@@ -131,7 +130,7 @@ func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env En
}
// creates relay and worker factory.
-func (app *App) initFactory() (roadrunner.Factory, error) {
+func (app *Plugin) initFactory() (roadrunner.Factory, error) {
if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
}
@@ -157,7 +156,7 @@ func (app *App) initFactory() (roadrunner.Factory, error) {
}
}
-func (app *App) setEnv(e Env) []string {
+func (app *Plugin) setEnv(e Env) []string {
env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay))
for k, v := range e {
env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
@@ -166,13 +165,13 @@ func (app *App) setEnv(e Env) []string {
return env
}
-func (app *App) collectLogs(event interface{}) {
+func (app *Plugin) collectLogs(event interface{}) {
if we, ok := event.(roadrunner.WorkerEvent); ok {
switch we.Event {
case roadrunner.EventWorkerError:
- app.log.Error(we.Payload.(error).Error(), zap.Int64("pid", we.Worker.Pid()))
+ app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
case roadrunner.EventWorkerLog:
- app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), zap.Int64("pid", we.Worker.Pid()))
+ app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
}
}
}
diff --git a/plugins/app/tests/app_test.go b/plugins/app/tests/app_test.go
new file mode 100644
index 00000000..3c416b59
--- /dev/null
+++ b/plugins/app/tests/app_test.go
@@ -0,0 +1,358 @@
+package tests
+
+import (
+ "os"
+ "os/signal"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestAppPipes(t *testing.T) {
+ container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // config plugin
+ vp := &config.Viper{}
+ vp.Path = "configs/.rr.yaml"
+ vp.Prefix = "rr"
+ err = container.Register(vp)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&app.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ errCh, err := container.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // stop by CTRL+C
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt)
+
+ // stop after 10 seconds
+ tt := time.NewTicker(time.Second * 10)
+
+ for {
+ select {
+ case e := <-errCh:
+ assert.NoError(t, e.Error)
+ assert.NoError(t, container.Stop())
+ return
+ case <-c:
+ er := container.Stop()
+ if er != nil {
+ panic(er)
+ }
+ return
+ case <-tt.C:
+ tt.Stop()
+ assert.NoError(t, container.Stop())
+ return
+ }
+ }
+}
+
+func TestAppSockets(t *testing.T) {
+ container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // config plugin
+ vp := &config.Viper{}
+ vp.Path = "configs/.rr-sockets.yaml"
+ vp.Prefix = "rr"
+ err = container.Register(vp)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&app.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo2{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ errCh, err := container.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // stop by CTRL+C
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt)
+
+ // stop after 10 seconds
+ tt := time.NewTicker(time.Second * 10)
+
+ for {
+ select {
+ case e := <-errCh:
+ assert.NoError(t, e.Error)
+ assert.NoError(t, container.Stop())
+ return
+ case <-c:
+ er := container.Stop()
+ if er != nil {
+ panic(er)
+ }
+ return
+ case <-tt.C:
+ tt.Stop()
+ assert.NoError(t, container.Stop())
+ return
+ }
+ }
+}
+
+func TestAppTCP(t *testing.T) {
+ container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // config plugin
+ vp := &config.Viper{}
+ vp.Path = "configs/.rr-tcp.yaml"
+ vp.Prefix = "rr"
+ err = container.Register(vp)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&app.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo3{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ errCh, err := container.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // stop by CTRL+C
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt)
+
+ // stop after 10 seconds
+ tt := time.NewTicker(time.Second * 10)
+
+ for {
+ select {
+ case e := <-errCh:
+ assert.NoError(t, e.Error)
+ assert.NoError(t, container.Stop())
+ return
+ case <-c:
+ er := container.Stop()
+ if er != nil {
+ panic(er)
+ }
+ return
+ case <-tt.C:
+ tt.Stop()
+ assert.NoError(t, container.Stop())
+ return
+ }
+ }
+}
+
+func TestAppWrongConfig(t *testing.T) {
+ container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // config plugin
+ vp := &config.Viper{}
+ vp.Path = "configs/.rrrrrrrrrr.yaml"
+ vp.Prefix = "rr"
+ err = container.Register(vp)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&app.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo3{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, container.Init())
+}
+
+func TestAppWrongRelay(t *testing.T) {
+ container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // config plugin
+ vp := &config.Viper{}
+ vp.Path = "configs/.rr-wrong-relay.yaml"
+ vp.Prefix = "rr"
+ err = container.Register(vp)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&app.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo3{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = container.Serve()
+ assert.Error(t, err)
+}
+
+func TestAppWrongCommand(t *testing.T) {
+ container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // config plugin
+ vp := &config.Viper{}
+ vp.Path = "configs/.rr-wrong-command.yaml"
+ vp.Prefix = "rr"
+ err = container.Register(vp)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&app.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo3{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = container.Serve()
+ assert.Error(t, err)
+}
+
+func TestAppNoAppSectionInConfig(t *testing.T) {
+ container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // config plugin
+ vp := &config.Viper{}
+ vp.Path = "configs/.rr-wrong-command.yaml"
+ vp.Prefix = "rr"
+ err = container.Register(vp)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&app.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo3{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = container.Serve()
+ assert.Error(t, err)
+}
diff --git a/plugins/app/tests/configs/.rr-no-app-section.yaml b/plugins/app/tests/configs/.rr-no-app-section.yaml
new file mode 100644
index 00000000..d129ae8a
--- /dev/null
+++ b/plugins/app/tests/configs/.rr-no-app-section.yaml
@@ -0,0 +1,9 @@
+upp:
+ command: "php ../../../tests/client.php echo pipes"
+ user: ""
+ group: ""
+ env:
+ "RR_CONFIG": "/some/place/on/the/C134"
+ "RR_CONFIG2": "C138"
+ relay: "pipes"
+ relayTimeout: "20s" \ No newline at end of file
diff --git a/plugins/app/tests/configs/.rr-sockets.yaml b/plugins/app/tests/configs/.rr-sockets.yaml
new file mode 100644
index 00000000..9bd62693
--- /dev/null
+++ b/plugins/app/tests/configs/.rr-sockets.yaml
@@ -0,0 +1,9 @@
+app:
+ command: "php socket.php"
+ user: ""
+ group: ""
+ env:
+ "RR_CONFIG": "/some/place/on/the/C134"
+ "RR_CONFIG2": "C138"
+ relay: "unix://unix.sock"
+ relayTimeout: "20s" \ No newline at end of file
diff --git a/plugins/app/tests/configs/.rr-tcp.yaml b/plugins/app/tests/configs/.rr-tcp.yaml
new file mode 100644
index 00000000..c5a26d37
--- /dev/null
+++ b/plugins/app/tests/configs/.rr-tcp.yaml
@@ -0,0 +1,9 @@
+app:
+ command: "php tcp.php"
+ user: ""
+ group: ""
+ env:
+ "RR_CONFIG": "/some/place/on/the/C134"
+ "RR_CONFIG2": "C138"
+ relay: "tcp://localhost:9999"
+ relayTimeout: "20s" \ No newline at end of file
diff --git a/plugins/app/tests/.rr.yaml b/plugins/app/tests/configs/.rr-wrong-command.yaml
index 171f51dc..4bd019d3 100644
--- a/plugins/app/tests/.rr.yaml
+++ b/plugins/app/tests/configs/.rr-wrong-command.yaml
@@ -1,9 +1,9 @@
app:
- command: "php hello.php"
+ command: "php some_absent_file.php"
user: ""
group: ""
env:
"RR_CONFIG": "/some/place/on/the/C134"
"RR_CONFIG2": "C138"
relay: "pipes"
- relayTimeout: "20s" \ No newline at end of file
+ relayTimeout: "20s"
diff --git a/plugins/app/tests/configs/.rr-wrong-relay.yaml b/plugins/app/tests/configs/.rr-wrong-relay.yaml
new file mode 100644
index 00000000..d8ffe8f8
--- /dev/null
+++ b/plugins/app/tests/configs/.rr-wrong-relay.yaml
@@ -0,0 +1,9 @@
+app:
+ command: "php ../../../tests/client.php echo pipes"
+ user: ""
+ group: ""
+ env:
+ "RR_CONFIG": "/some/place/on/the/C134"
+ "RR_CONFIG2": "C138"
+ relay: "pupes"
+ relayTimeout: "20s" \ No newline at end of file
diff --git a/plugins/app/tests/configs/.rr.yaml b/plugins/app/tests/configs/.rr.yaml
new file mode 100644
index 00000000..221aff92
--- /dev/null
+++ b/plugins/app/tests/configs/.rr.yaml
@@ -0,0 +1,9 @@
+app:
+ command: "php ../../../tests/client.php echo pipes"
+ user: ""
+ group: ""
+ env:
+ "RR_CONFIG": "/some/place/on/the/C134"
+ "RR_CONFIG2": "C138"
+ relay: "pipes"
+ relayTimeout: "20s" \ No newline at end of file
diff --git a/plugins/app/tests/hello.php b/plugins/app/tests/hello.php
deleted file mode 100644
index bf9e82cc..00000000
--- a/plugins/app/tests/hello.php
+++ /dev/null
@@ -1 +0,0 @@
-<?php echo "hello1 - " . time(); \ No newline at end of file
diff --git a/plugins/app/tests/plugin_1.go b/plugins/app/tests/plugin_1.go
deleted file mode 100644
index 7259ea9d..00000000
--- a/plugins/app/tests/plugin_1.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package tests
-
-import (
- "errors"
- "fmt"
-
- "github.com/spiral/roadrunner/v2/plugins/app"
- "github.com/spiral/roadrunner/v2/plugins/config"
-)
-
-type Foo struct {
- configProvider config.Provider
- spawner app.WorkerFactory
-}
-
-func (f *Foo) Init(p config.Provider, spw app.WorkerFactory) error {
- f.configProvider = p
- f.spawner = spw
- return nil
-}
-
-func (f *Foo) Serve() chan error {
- errCh := make(chan error, 1)
-
- r := &app.Config{}
- err := f.configProvider.UnmarshalKey("app", r)
- if err != nil {
- errCh <- err
- return errCh
- }
-
- cmd, err := f.spawner.CmdFactory(nil)
- if err != nil {
- errCh <- err
- return errCh
- }
- if cmd == nil {
- errCh <- errors.New("command is nil")
- return errCh
- }
- a := cmd()
- out, err := a.Output()
- if err != nil {
- errCh <- err
- return errCh
- }
-
- fmt.Println(string(out))
-
- return errCh
-}
-
-func (f *Foo) Stop() error {
- return nil
-}
diff --git a/plugins/app/tests/plugin_2.go b/plugins/app/tests/plugin_2.go
deleted file mode 100644
index fbb9ca11..00000000
--- a/plugins/app/tests/plugin_2.go
+++ /dev/null
@@ -1,88 +0,0 @@
-package tests
-
-import (
- "context"
- "errors"
- "fmt"
- "time"
-
- "github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/plugins/app"
- "github.com/spiral/roadrunner/v2/plugins/config"
-)
-
-type Foo2 struct {
- configProvider config.Provider
- wf app.WorkerFactory
-}
-
-func (f *Foo2) Init(p config.Provider, workerFactory app.WorkerFactory) error {
- f.configProvider = p
- f.wf = workerFactory
- return nil
-}
-
-func (f *Foo2) Serve() chan error {
- errCh := make(chan error, 1)
-
- r := &app.Config{}
- err := f.configProvider.UnmarshalKey("app", r)
- if err != nil {
- errCh <- err
- return errCh
- }
-
- cmd, err := f.wf.CmdFactory(nil)
- if err != nil {
- errCh <- err
- return errCh
- }
- if cmd == nil {
- errCh <- errors.New("command is nil")
- return errCh
- }
- a := cmd()
- out, err := a.Output()
- if err != nil {
- errCh <- err
- return errCh
- }
-
- w, err := f.wf.NewWorker(context.Background(), nil)
- if err != nil {
- errCh <- err
- return errCh
- }
-
- _ = w
-
- poolConfig := roadrunner.Config{
- NumWorkers: 10,
- MaxJobs: 100,
- AllocateTimeout: time.Second * 10,
- DestroyTimeout: time.Second * 10,
- Supervisor: &roadrunner.SupervisorConfig{
- WatchTick: 60,
- TTL: 1000,
- IdleTTL: 10,
- ExecTTL: 10,
- MaxWorkerMemory: 1000,
- },
- }
-
- pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil)
- if err != nil {
- errCh <- err
- return errCh
- }
-
- _ = pool
-
- fmt.Println(string(out))
-
- return errCh
-}
-
-func (f *Foo2) Stop() error {
- return nil
-}
diff --git a/plugins/app/tests/plugin_pipes.go b/plugins/app/tests/plugin_pipes.go
new file mode 100644
index 00000000..fc999718
--- /dev/null
+++ b/plugins/app/tests/plugin_pipes.go
@@ -0,0 +1,130 @@
+package tests
+
+import (
+ "context"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+const ConfigSection = "app"
+const Response = "test"
+
+var testPoolConfig = roadrunner.Config{
+ NumWorkers: 10,
+ MaxJobs: 100,
+ AllocateTimeout: time.Second * 10,
+ DestroyTimeout: time.Second * 10,
+ Supervisor: &roadrunner.SupervisorConfig{
+ WatchTick: 60,
+ TTL: 1000,
+ IdleTTL: 10,
+ ExecTTL: 10,
+ MaxWorkerMemory: 1000,
+ },
+}
+
+type Foo struct {
+ configProvider config.Configurer
+ wf app.WorkerFactory
+ pool roadrunner.Pool
+}
+
+func (f *Foo) Init(p config.Configurer, workerFactory app.WorkerFactory) error {
+ f.configProvider = p
+ f.wf = workerFactory
+ return nil
+}
+
+func (f *Foo) Serve() chan error {
+ const op = errors.Op("serve")
+
+ // test payload for echo
+ r := roadrunner.Payload{
+ Context: nil,
+ Body: []byte(Response),
+ }
+
+ errCh := make(chan error, 1)
+
+ conf := &app.Config{}
+ var err error
+ err = f.configProvider.UnmarshalKey(ConfigSection, conf)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test CMDFactory
+ cmd, err := f.wf.CmdFactory(nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ if cmd == nil {
+ errCh <- errors.E(op, "command is nil")
+ return errCh
+ }
+
+ // test worker creation
+ w, err := f.wf.NewWorker(context.Background(), nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test that our worker is functional
+ sw, err := roadrunner.NewSyncWorker(w)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ rsp, err := sw.Exec(r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ if string(rsp.Body) != Response {
+ errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body))
+ return errCh
+ }
+
+ // should not be errors
+ err = sw.Stop(context.Background())
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test pool
+ f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test pool execution
+ rsp, err = f.pool.Exec(r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // echo of the "test" should be -> test
+ if string(rsp.Body) != Response {
+ errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body))
+ return errCh
+ }
+
+ return errCh
+}
+
+func (f *Foo) Stop() error {
+ f.pool.Destroy(context.Background())
+ return nil
+}
diff --git a/plugins/app/tests/plugin_sockets.go b/plugins/app/tests/plugin_sockets.go
new file mode 100644
index 00000000..585264f6
--- /dev/null
+++ b/plugins/app/tests/plugin_sockets.go
@@ -0,0 +1,111 @@
+package tests
+
+import (
+ "context"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+type Foo2 struct {
+ configProvider config.Configurer
+ wf app.WorkerFactory
+ pool roadrunner.Pool
+}
+
+func (f *Foo2) Init(p config.Configurer, workerFactory app.WorkerFactory) error {
+ f.configProvider = p
+ f.wf = workerFactory
+ return nil
+}
+
+func (f *Foo2) Serve() chan error {
+ const op = errors.Op("serve")
+ var err error
+ errCh := make(chan error, 1)
+ conf := &app.Config{}
+
+ // test payload for echo
+ r := roadrunner.Payload{
+ Context: nil,
+ Body: []byte(Response),
+ }
+
+ err = f.configProvider.UnmarshalKey(ConfigSection, conf)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test CMDFactory
+ cmd, err := f.wf.CmdFactory(nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ if cmd == nil {
+ errCh <- errors.E(op, "command is nil")
+ return errCh
+ }
+
+ // test worker creation
+ w, err := f.wf.NewWorker(context.Background(), nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test that our worker is functional
+ sw, err := roadrunner.NewSyncWorker(w)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ rsp, err := sw.Exec(r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ if string(rsp.Body) != Response {
+ errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body))
+ return errCh
+ }
+
+ // should not be errors
+ err = sw.Stop(context.Background())
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test pool
+ f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test pool execution
+ rsp, err = f.pool.Exec(r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // echo of the "test" should be -> test
+ if string(rsp.Body) != Response {
+ errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body))
+ return errCh
+ }
+
+ return errCh
+}
+
+func (f *Foo2) Stop() error {
+ f.pool.Destroy(context.Background())
+ return nil
+}
diff --git a/plugins/app/tests/plugin_tcp.go b/plugins/app/tests/plugin_tcp.go
new file mode 100644
index 00000000..6abc533d
--- /dev/null
+++ b/plugins/app/tests/plugin_tcp.go
@@ -0,0 +1,111 @@
+package tests
+
+import (
+ "context"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+type Foo3 struct {
+ configProvider config.Configurer
+ wf app.WorkerFactory
+ pool roadrunner.Pool
+}
+
+func (f *Foo3) Init(p config.Configurer, workerFactory app.WorkerFactory) error {
+ f.configProvider = p
+ f.wf = workerFactory
+ return nil
+}
+
+func (f *Foo3) Serve() chan error {
+ const op = errors.Op("serve")
+ var err error
+ errCh := make(chan error, 1)
+ conf := &app.Config{}
+
+ // test payload for echo
+ r := roadrunner.Payload{
+ Context: nil,
+ Body: []byte(Response),
+ }
+
+ err = f.configProvider.UnmarshalKey(ConfigSection, conf)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test CMDFactory
+ cmd, err := f.wf.CmdFactory(nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ if cmd == nil {
+ errCh <- errors.E(op, "command is nil")
+ return errCh
+ }
+
+ // test worker creation
+ w, err := f.wf.NewWorker(context.Background(), nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test that our worker is functional
+ sw, err := roadrunner.NewSyncWorker(w)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ rsp, err := sw.Exec(r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ if string(rsp.Body) != Response {
+ errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body))
+ return errCh
+ }
+
+ // should not be errors
+ err = sw.Stop(context.Background())
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test pool
+ f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test pool execution
+ rsp, err = f.pool.Exec(r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // echo of the "test" should be -> test
+ if string(rsp.Body) != Response {
+ errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body))
+ return errCh
+ }
+
+ return errCh
+}
+
+func (f *Foo3) Stop() error {
+ f.pool.Destroy(context.Background())
+ return nil
+}
diff --git a/plugins/app/tests/socket.php b/plugins/app/tests/socket.php
new file mode 100644
index 00000000..143c3ce4
--- /dev/null
+++ b/plugins/app/tests/socket.php
@@ -0,0 +1,25 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+require dirname(__DIR__) . "/../../vendor_php/autoload.php";
+
+$relay = new Goridge\SocketRelay(
+ "unix.sock",
+ null,
+ Goridge\SocketRelay::SOCK_UNIX
+ );
+
+$rr = new RoadRunner\Worker($relay);
+
+while ($in = $rr->receive($ctx)) {
+ try {
+ $rr->send((string)$in);
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+}
diff --git a/plugins/app/tests/tcp.php b/plugins/app/tests/tcp.php
new file mode 100644
index 00000000..2d6fb00a
--- /dev/null
+++ b/plugins/app/tests/tcp.php
@@ -0,0 +1,20 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+require dirname(__DIR__) . "/../../vendor_php/autoload.php";
+
+$relay = new Goridge\SocketRelay("localhost", 9999);
+$rr = new RoadRunner\Worker($relay);
+
+while ($in = $rr->receive($ctx)) {
+ try {
+ $rr->send((string)$in);
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+} \ No newline at end of file
diff --git a/plugins/config/provider.go b/plugins/config/configurer.go
index ac33b3de..00010eae 100755
--- a/plugins/config/provider.go
+++ b/plugins/config/configurer.go
@@ -1,9 +1,9 @@
package config
-type Provider interface {
+type Configurer interface {
// UnmarshalKey reads configuration section into configuration object.
//
- // func (h *HttpService) Init(cp config.Provider) error {
+ // func (h *HttpService) Init(cp config.Configurer) error {
// h.config := &HttpConfig{}
// if err := configProvider.UnmarshalKey("http", h.config); err != nil {
// return err
diff --git a/plugins/config/viper.go b/plugins/config/plugin.go
index 4e85af6b..2555d28a 100755
--- a/plugins/config/viper.go
+++ b/plugins/config/plugin.go
@@ -8,14 +8,14 @@ import (
"github.com/spf13/viper"
)
-type ViperProvider struct {
+type Viper struct {
viper *viper.Viper
Path string
Prefix string
}
// Inits config provider.
-func (v *ViperProvider) Init() error {
+func (v *Viper) Init() error {
v.viper = viper.New()
// read in environment variables that match
@@ -36,7 +36,7 @@ func (v *ViperProvider) Init() error {
}
// Overwrite overwrites existing config with provided values
-func (v *ViperProvider) Overwrite(values map[string]string) error {
+func (v *Viper) Overwrite(values map[string]string) error {
if len(values) != 0 {
for _, flag := range values {
key, value, err := parseFlag(flag)
@@ -51,7 +51,7 @@ func (v *ViperProvider) Overwrite(values map[string]string) error {
}
// UnmarshalKey reads configuration section into configuration object.
-func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error {
+func (v *Viper) UnmarshalKey(name string, out interface{}) error {
err := v.viper.UnmarshalKey(name, &out)
if err != nil {
return err
@@ -60,12 +60,12 @@ func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error {
}
// Get raw config in a form of config section.
-func (v *ViperProvider) Get(name string) interface{} {
+func (v *Viper) Get(name string) interface{} {
return v.viper.Get(name)
}
// Has checks if config section exists.
-func (v *ViperProvider) Has(name string) bool {
+func (v *Viper) Has(name string) bool {
return v.viper.IsSet(name)
}
diff --git a/plugins/config/tests/.rr.yaml b/plugins/config/tests/.rr.yaml
index df9077d0..732a1366 100755
--- a/plugins/config/tests/.rr.yaml
+++ b/plugins/config/tests/.rr.yaml
@@ -1,21 +1,12 @@
reload:
- # enable or disable file watcher
enabled: true
- # sync interval
interval: 1s
- # global patterns to sync
patterns: [".php"]
- # list of included for sync services
services:
http:
- # recursive search for file patterns to add
recursive: true
- # ignored folders
ignore: ["vendor"]
- # service specific file pattens to sync
patterns: [".php", ".go",".md",]
- # directories to sync. If recursive is set to true,
- # recursive sync will be applied only to the directories in `dirs` section
dirs: ["."]
jobs:
recursive: false
@@ -24,5 +15,4 @@ reload:
rpc:
recursive: true
patterns: [".json"]
- # to include all project directories from workdir, leave `dirs` empty or add a dot "."
dirs: [""]
diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go
index 14e60ac2..422e7eee 100755
--- a/plugins/config/tests/config_test.go
+++ b/plugins/config/tests/config_test.go
@@ -12,11 +12,11 @@ import (
)
func TestViperProvider_Init(t *testing.T) {
- container, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true))
+ container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel))
if err != nil {
t.Fatal(err)
}
- vp := &config.ViperProvider{}
+ vp := &config.Viper{}
vp.Path = ".rr.yaml"
vp.Prefix = "rr"
err = container.Register(vp)
diff --git a/plugins/config/tests/plugin1.go b/plugins/config/tests/plugin1.go
index 7c5f2afd..a276c15f 100755
--- a/plugins/config/tests/plugin1.go
+++ b/plugins/config/tests/plugin1.go
@@ -23,11 +23,11 @@ type ServiceConfig struct {
}
type Foo struct {
- configProvider config.Provider
+ configProvider config.Configurer
}
// Depends on S2 and DB (S3 in the current case)
-func (f *Foo) Init(p config.Provider) error {
+func (f *Foo) Init(p config.Configurer) error {
f.configProvider = p
return nil
}
diff --git a/plugins/logger/encoder.go b/plugins/logger/encoder.go
index 66cd84f1..4ff583c4 100644
--- a/plugins/logger/encoder.go
+++ b/plugins/logger/encoder.go
@@ -61,6 +61,6 @@ func UTCTimeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
// returns string hash
func stringHash(name string, base int) int {
h := fnv.New32a()
- h.Write([]byte(name))
+ _, _ = h.Write([]byte(name))
return int(h.Sum32()) % base
}
diff --git a/plugins/logger/zap_logger.go b/plugins/logger/plugin.go
index 8c1739f2..f05d0ff0 100644
--- a/plugins/logger/zap_logger.go
+++ b/plugins/logger/plugin.go
@@ -2,6 +2,7 @@ package logger
import (
"github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/log"
"github.com/spiral/roadrunner/v2/plugins/config"
"go.uber.org/zap"
)
@@ -9,14 +10,6 @@ import (
// ServiceName declares service name.
const ServiceName = "logs"
-type LogFactory interface {
- // GlobalLogger returns global log instance.
- GlobalLogger() *zap.Logger
-
- // NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
- NamedLogger(name string) *zap.Logger
-}
-
// ZapLogger manages zap logger.
type ZapLogger struct {
base *zap.Logger
@@ -25,8 +18,8 @@ type ZapLogger struct {
}
// Init logger service.
-func (z *ZapLogger) Init(cfg config.Provider) (err error) {
- err = cfg.UnmarshalKey(ServiceName, &z.cfg)
+func (z *ZapLogger) Init(cfg config.Configurer) error {
+ err := cfg.UnmarshalKey(ServiceName, &z.cfg)
if err != nil {
return err
}
@@ -41,28 +34,32 @@ func (z *ZapLogger) Init(cfg config.Provider) (err error) {
}
// DefaultLogger returns default logger.
-func (z *ZapLogger) DefaultLogger() (*zap.Logger, error) {
- return z.base, nil
+func (z *ZapLogger) DefaultLogger() (log.Logger, error) {
+ return log.NewZapAdapter(z.base), nil
}
// NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
-func (z *ZapLogger) NamedLogger(name string) (*zap.Logger, error) {
+func (z *ZapLogger) NamedLogger(name string) (log.Logger, error) {
if cfg, ok := z.channels.Channels[name]; ok {
- return cfg.BuildLogger()
+ l, err := cfg.BuildLogger()
+ if err != nil {
+ return nil, err
+ }
+ return log.NewZapAdapter(l), nil
}
- return z.base.Named(name), nil
+ return log.NewZapAdapter(z.base.Named(name)), nil
}
// NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
-func (z *ZapLogger) ServiceLogger(n endure.Named) (*zap.Logger, error) {
+func (z *ZapLogger) ServiceLogger(n endure.Named) (log.Logger, error) {
return z.NamedLogger(n.Name())
}
// Provides declares factory methods.
func (z *ZapLogger) Provides() []interface{} {
return []interface{}{
- z.DefaultLogger,
z.ServiceLogger,
+ z.DefaultLogger,
}
}
diff --git a/plugins/logger/tests/.rr.yaml b/plugins/logger/tests/.rr.yaml
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/plugins/logger/tests/.rr.yaml
diff --git a/plugins/app/tests/factory_test.go b/plugins/logger/tests/logger_test.go
index 7c885797..1df74c47 100644
--- a/plugins/app/tests/factory_test.go
+++ b/plugins/logger/tests/logger_test.go
@@ -7,18 +7,18 @@ import (
"time"
"github.com/spiral/endure"
- "github.com/spiral/roadrunner/v2/plugins/app"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/stretchr/testify/assert"
)
-func TestFactory(t *testing.T) {
- container, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true))
+func TestLogger(t *testing.T) {
+ container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.DebugLevel))
if err != nil {
t.Fatal(err)
}
// config plugin
- vp := &config.ViperProvider{}
+ vp := &config.Viper{}
vp.Path = ".rr.yaml"
vp.Prefix = "rr"
err = container.Register(vp)
@@ -26,17 +26,12 @@ func TestFactory(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&app.App{})
+ err = container.Register(&Plugin{})
if err != nil {
t.Fatal(err)
}
- err = container.Register(&Foo{})
- if err != nil {
- t.Fatal(err)
- }
-
- err = container.Register(&Foo2{})
+ err = container.Register(&logger.ZapLogger{})
if err != nil {
t.Fatal(err)
}
@@ -55,7 +50,8 @@ func TestFactory(t *testing.T) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
- tt := time.NewTicker(time.Second * 2)
+ // stop after 10 seconds
+ tt := time.NewTicker(time.Second * 10)
for {
select {
diff --git a/plugins/logger/tests/plugin.go b/plugins/logger/tests/plugin.go
new file mode 100644
index 00000000..75d2736d
--- /dev/null
+++ b/plugins/logger/tests/plugin.go
@@ -0,0 +1,40 @@
+package tests
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/log"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+type Plugin struct {
+ config config.Configurer
+ log log.Logger
+}
+
+func (p1 *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+ p1.config = cfg
+ p1.log = log
+ return nil
+}
+
+func (p1 *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ p1.log.Error("error", "test", errors.E(errors.Str("test")))
+ p1.log.Info("error", "test", errors.E(errors.Str("test")))
+ p1.log.Debug("error", "test", errors.E(errors.Str("test")))
+ p1.log.Warn("error", "test", errors.E(errors.Str("test")))
+
+ p1.log.Error("error", "test")
+ p1.log.Info("error", "test")
+ p1.log.Debug("error", "test")
+ p1.log.Warn("error", "test")
+ return errCh
+}
+
+func (p1 *Plugin) Stop() error {
+ return nil
+}
+
+func (p1 *Plugin) Name() string {
+ return "logger_plugin"
+}
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/plugin.go
index f299bd55..6401c0e2 100755
--- a/plugins/rpc/rpc.go
+++ b/plugins/rpc/plugin.go
@@ -1,18 +1,19 @@
package rpc
import (
+ "net"
"net/rpc"
-
- "go.uber.org/zap"
+ "sync/atomic"
"github.com/spiral/endure"
"github.com/spiral/errors"
"github.com/spiral/goridge/v2"
+ "github.com/spiral/roadrunner/v2/log"
"github.com/spiral/roadrunner/v2/plugins/config"
)
-// RPCPluggable declares the ability to create set of public RPC methods.
-type RPCPluggable interface {
+// Pluggable declares the ability to create set of public RPC methods.
+type Pluggable interface {
endure.Named
// Provides RPC methods for the given service.
@@ -20,21 +21,23 @@ type RPCPluggable interface {
}
// ServiceName contains default service name.
-const ServiceName = "rpc"
+const ServiceName = "RPC"
-// Service is RPC service.
-type Service struct {
+// Plugin is RPC service.
+type Plugin struct {
cfg Config
- log *zap.Logger
+ log log.Logger
rpc *rpc.Server
- services []RPCPluggable
- close chan struct{}
+ services []Pluggable
+ listener net.Listener
+ closed *uint32
}
// Init rpc service. Must return true if service is enabled.
-func (s *Service) Init(cfg config.Provider, log *zap.Logger) error {
+func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+ const op = errors.Op("RPC Init")
if !cfg.Has(ServiceName) {
- return errors.E(errors.Disabled)
+ return errors.E(op, errors.Disabled)
}
err := cfg.UnmarshalKey(ServiceName, &s.cfg)
@@ -44,66 +47,68 @@ func (s *Service) Init(cfg config.Provider, log *zap.Logger) error {
s.cfg.InitDefaults()
if s.cfg.Disabled {
- return errors.E(errors.Disabled)
+ return errors.E(op, errors.Disabled)
}
s.log = log
+ state := uint32(0)
+ s.closed = &state
+ atomic.StoreUint32(s.closed, 0)
return s.cfg.Valid()
}
// Serve serves the service.
-func (s *Service) Serve() chan error {
+func (s *Plugin) Serve() chan error {
+ const op = errors.Op("register service")
errCh := make(chan error, 1)
- s.close = make(chan struct{}, 1)
s.rpc = rpc.NewServer()
- names := make([]string, 0, len(s.services))
+ services := make([]string, 0, len(s.services))
// Attach all services
for i := 0; i < len(s.services); i++ {
svc, err := s.services[i].RPCService()
if err != nil {
- errCh <- errors.E(errors.Op("register service"), err)
+ errCh <- errors.E(op, err)
return errCh
}
err = s.Register(s.services[i].Name(), svc)
if err != nil {
- errCh <- errors.E(errors.Op("register service"), err)
+ errCh <- errors.E(op, err)
return errCh
}
- names = append(names, s.services[i].Name())
+ services = append(services, s.services[i].Name())
}
- ln, err := s.cfg.Listener()
+ var err error
+ s.listener, err = s.cfg.Listener()
if err != nil {
errCh <- err
return errCh
}
- s.log.Debug("Started RPC service", zap.String("address", s.cfg.Listen), zap.Any("services", names))
+ s.log.Debug("Started RPC service", "address", s.cfg.Listen, "services", services)
go func() {
for {
- select {
- case <-s.close:
- // log error
- err := ln.Close()
- if err != nil {
- errCh <- errors.E(errors.Op("close RPC socket"), err)
- }
- return
- default:
- conn, err := ln.Accept()
- if err != nil {
- continue
+ conn, err := s.listener.Accept()
+ if err != nil {
+ if atomic.LoadUint32(s.closed) == 1 {
+ // just log and continue, this is not a critical issue, we just called Stop
+ s.log.Error("listener accept error, connection closed", "error", err)
+ return
}
- go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ s.log.Error("listener accept error", "error", err)
+ errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err)
+ return
}
+
+ go s.rpc.ServeCodec(goridge.NewCodec(conn))
}
}()
@@ -111,25 +116,30 @@ func (s *Service) Serve() chan error {
}
// Stop stops the service.
-func (s *Service) Stop() error {
- s.close <- struct{}{}
+func (s *Plugin) Stop() error {
+ // store closed state
+ atomic.StoreUint32(s.closed, 1)
+ err := s.listener.Close()
+ if err != nil {
+ return errors.E(errors.Op("stop RPC socket"), err)
+ }
return nil
}
// Name contains service name.
-func (s *Service) Name() string {
+func (s *Plugin) Name() string {
return ServiceName
}
// Depends declares services to collect for RPC.
-func (s *Service) Depends() []interface{} {
+func (s *Plugin) Collects() []interface{} {
return []interface{}{
s.RegisterPlugin,
}
}
// RegisterPlugin registers RPC service plugin.
-func (s *Service) RegisterPlugin(p RPCPluggable) error {
+func (s *Plugin) RegisterPlugin(p Pluggable) error {
s.services = append(s.services, p)
return nil
}
@@ -142,7 +152,7 @@ func (s *Service) RegisterPlugin(p RPCPluggable) error {
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
-func (s *Service) Register(name string, svc interface{}) error {
+func (s *Plugin) Register(name string, svc interface{}) error {
if s.rpc == nil {
return errors.E("RPC service is not configured")
}
@@ -151,7 +161,7 @@ func (s *Service) Register(name string, svc interface{}) error {
}
// Client creates new RPC client.
-func (s *Service) Client() (*rpc.Client, error) {
+func (s *Plugin) Client() (*rpc.Client, error) {
conn, err := s.cfg.Dialer()
if err != nil {
return nil, err
diff --git a/plugins/rpc/tests/.rr-rpc-disabled.yaml b/plugins/rpc/tests/.rr-rpc-disabled.yaml
new file mode 100644
index 00000000..624fb3c5
--- /dev/null
+++ b/plugins/rpc/tests/.rr-rpc-disabled.yaml
@@ -0,0 +1,3 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+ disabled: true \ No newline at end of file
diff --git a/plugins/rpc/tests/.rr.yaml b/plugins/rpc/tests/.rr.yaml
new file mode 100644
index 00000000..76e8b440
--- /dev/null
+++ b/plugins/rpc/tests/.rr.yaml
@@ -0,0 +1,3 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+ disabled: false \ No newline at end of file
diff --git a/plugins/rpc/tests/plugin1.go b/plugins/rpc/tests/plugin1.go
new file mode 100644
index 00000000..788e6a2c
--- /dev/null
+++ b/plugins/rpc/tests/plugin1.go
@@ -0,0 +1,42 @@
+package tests
+
+import (
+ "fmt"
+
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+type Plugin1 struct {
+ config config.Configurer
+}
+
+func (p1 *Plugin1) Init(cfg config.Configurer) error {
+ p1.config = cfg
+ return nil
+}
+
+func (p1 *Plugin1) Serve() chan error {
+ errCh := make(chan error, 1)
+ return errCh
+}
+
+func (p1 *Plugin1) Stop() error {
+ return nil
+}
+
+func (p1 *Plugin1) Name() string {
+ return "rpc_test.plugin1"
+}
+
+func (p1 *Plugin1) RPCService() (interface{}, error) {
+ return &PluginRpc{srv: p1}, nil
+}
+
+type PluginRpc struct {
+ srv *Plugin1
+}
+
+func (r *PluginRpc) Hello(in string, out *string) error {
+ *out = fmt.Sprintf("Hello, username: %s", in)
+ return nil
+}
diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go
new file mode 100644
index 00000000..854bf097
--- /dev/null
+++ b/plugins/rpc/tests/plugin2.go
@@ -0,0 +1,54 @@
+package tests
+
+import (
+ "net"
+ "net/rpc"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v2"
+)
+
+// plugin2 makes a call to the plugin1 via RPC
+// this is just a simulation of external call FOR TEST
+// you don't need to do such things :)
+type Plugin2 struct {
+}
+
+func (p2 *Plugin2) Init() error {
+ return nil
+}
+
+func (p2 *Plugin2) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ go func() {
+ time.Sleep(time.Second * 3)
+
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ if err != nil {
+ errCh <- errors.E(errors.Serve, err)
+ return
+ }
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret string
+ err = client.Call("rpc_test.plugin1.Hello", "Valery", &ret)
+ if err != nil {
+ errCh <- err
+ return
+ }
+ if ret != "Hello, username: Valery" {
+ errCh <- errors.E("wrong response")
+ return
+ }
+ // to stop exec
+ errCh <- errors.E(errors.Disabled)
+ return
+ }()
+
+ return errCh
+}
+
+func (p2 *Plugin2) Stop() error {
+ return nil
+}
diff --git a/plugins/rpc/tests/rpc_test.go b/plugins/rpc/tests/rpc_test.go
new file mode 100644
index 00000000..88267dfb
--- /dev/null
+++ b/plugins/rpc/tests/rpc_test.go
@@ -0,0 +1,169 @@
+package tests
+
+import (
+ "os"
+ "os/signal"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/stretchr/testify/assert"
+)
+
+// graph https://bit.ly/3ensdNb
+func TestRpcInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&Plugin1{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&Plugin2{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ v := &config.Viper{}
+ v.Path = ".rr.yaml"
+ v.Prefix = "rr"
+ err = cont.Register(v)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&rpc.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+ tt := time.NewTimer(time.Second * 10)
+
+ for {
+ select {
+ case e := <-ch:
+ // just stop, this is ok
+ if errors.Is(errors.Disabled, e.Error) {
+ return
+ }
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ assert.Fail(t, "timeout")
+ }
+ }
+}
+
+// graph https://bit.ly/3ensdNb
+func TestRpcDisabled(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&Plugin1{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&Plugin2{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ v := &config.Viper{}
+ v.Path = ".rr-rpc-disabled.yaml"
+ v.Prefix = "rr"
+ err = cont.Register(v)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&rpc.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+ tt := time.NewTimer(time.Second * 20)
+
+ for {
+ select {
+ case e := <-ch:
+ // RPC is turned off, should be and dial error
+ if errors.Is(errors.Disabled, e.Error) {
+ assert.FailNow(t, "should not be disabled error")
+ }
+ assert.Error(t, e.Error)
+ err = cont.Stop()
+ assert.Error(t, err)
+ return
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ return
+ }
+ }
+}