summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-05 15:45:18 +0300
committerValery Piashchynski <[email protected]>2020-11-05 15:45:18 +0300
commitbca69d968ef76a6260956bc169cb07996fbb7724 (patch)
treebcca677f40d0dd19da5095bfe8dc5c6bacf64566 /plugins
parent3a0a0190ed130e72ebc150bbb3e3c1582f22dd16 (diff)
App test
Diffstat (limited to 'plugins')
-rw-r--r--plugins/app/plugin.go11
-rw-r--r--plugins/app/tests/.rr.yaml2
-rw-r--r--plugins/app/tests/factory_test.go10
-rw-r--r--plugins/app/tests/hello.php3
-rw-r--r--plugins/app/tests/plugin.go98
-rw-r--r--plugins/app/tests/plugin_1.go55
-rw-r--r--plugins/app/tests/plugin_2.go88
-rw-r--r--plugins/logger/encoder.go2
-rw-r--r--plugins/logger/plugin.go (renamed from plugins/logger/zap_logger.go)31
-rw-r--r--plugins/logger/tests/plugin1.go25
-rwxr-xr-xplugins/rpc/plugin.go (renamed from plugins/rpc/rpc.go)15
-rw-r--r--plugins/rpc/tests/plugin2.go3
12 files changed, 156 insertions, 187 deletions
diff --git a/plugins/app/plugin.go b/plugins/app/plugin.go
index 2ef851e8..839685bd 100644
--- a/plugins/app/plugin.go
+++ b/plugins/app/plugin.go
@@ -7,10 +7,9 @@ import (
"os/exec"
"strings"
- "go.uber.org/zap"
-
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/log"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
)
@@ -29,12 +28,12 @@ type WorkerFactory interface {
// Plugin manages worker
type Plugin struct {
cfg Config
- log *zap.Logger
+ log log.Logger
factory roadrunner.Factory
}
// Init application provider.
-func (app *Plugin) Init(cfg config.Configurer, log *zap.Logger) error {
+func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error {
err := cfg.UnmarshalKey(ServiceName, &app.cfg)
if err != nil {
return err
@@ -170,9 +169,9 @@ func (app *Plugin) collectLogs(event interface{}) {
if we, ok := event.(roadrunner.WorkerEvent); ok {
switch we.Event {
case roadrunner.EventWorkerError:
- app.log.Error(we.Payload.(error).Error(), zap.Int64("pid", we.Worker.Pid()))
+ app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
case roadrunner.EventWorkerLog:
- app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), zap.Int64("pid", we.Worker.Pid()))
+ app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
}
}
}
diff --git a/plugins/app/tests/.rr.yaml b/plugins/app/tests/.rr.yaml
index c9d1bd40..171f51dc 100644
--- a/plugins/app/tests/.rr.yaml
+++ b/plugins/app/tests/.rr.yaml
@@ -1,5 +1,5 @@
app:
- command: "php plugins/app/tests/hello.php hello_echo"
+ command: "php hello.php"
user: ""
group: ""
env:
diff --git a/plugins/app/tests/factory_test.go b/plugins/app/tests/factory_test.go
index 969c361c..878050a8 100644
--- a/plugins/app/tests/factory_test.go
+++ b/plugins/app/tests/factory_test.go
@@ -20,7 +20,7 @@ func TestFactory(t *testing.T) {
}
// config plugin
vp := &config.Viper{}
- vp.Path = "plugins/app/tests/.rr.yaml"
+ vp.Path = ".rr.yaml"
vp.Prefix = "rr"
err = container.Register(vp)
if err != nil {
@@ -37,11 +37,6 @@ func TestFactory(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&Foo2{})
- if err != nil {
- t.Fatal(err)
- }
-
err = container.Register(&logger.ZapLogger{})
if err != nil {
t.Fatal(err)
@@ -61,7 +56,8 @@ func TestFactory(t *testing.T) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
- tt := time.NewTicker(time.Second * 200)
+ // stop after 10 seconds
+ tt := time.NewTicker(time.Second * 10)
for {
select {
diff --git a/plugins/app/tests/hello.php b/plugins/app/tests/hello.php
index 4219dcf4..2591b77f 100644
--- a/plugins/app/tests/hello.php
+++ b/plugins/app/tests/hello.php
@@ -6,8 +6,9 @@
use Spiral\Goridge;
use Spiral\RoadRunner;
-require "/vendor_php/autoload.php";
+require dirname(__DIR__) . "/../../vendor_php/autoload.php";
+$relay = new Goridge\StreamRelay(STDIN, STDOUT);
$rr = new RoadRunner\Worker($relay);
while ($in = $rr->receive($ctx)) {
diff --git a/plugins/app/tests/plugin.go b/plugins/app/tests/plugin.go
new file mode 100644
index 00000000..e37aca01
--- /dev/null
+++ b/plugins/app/tests/plugin.go
@@ -0,0 +1,98 @@
+package tests
+
+import (
+ "context"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+const ConfigSection = "app"
+
+var testPoolConfig = roadrunner.Config{
+ NumWorkers: 10,
+ MaxJobs: 100,
+ AllocateTimeout: time.Second * 10,
+ DestroyTimeout: time.Second * 10,
+ Supervisor: &roadrunner.SupervisorConfig{
+ WatchTick: 60,
+ TTL: 1000,
+ IdleTTL: 10,
+ ExecTTL: 10,
+ MaxWorkerMemory: 1000,
+ },
+}
+
+type Foo struct {
+ configProvider config.Configurer
+ wf app.WorkerFactory
+ pool roadrunner.Pool
+}
+
+func (f *Foo) Init(p config.Configurer, workerFactory app.WorkerFactory) error {
+ f.configProvider = p
+ f.wf = workerFactory
+ return nil
+}
+
+func (f *Foo) Serve() chan error {
+ const op = errors.Op("serve")
+ errCh := make(chan error, 1)
+
+ conf := &app.Config{}
+ var err error
+ err = f.configProvider.UnmarshalKey(ConfigSection, conf)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // test CMDFactory
+ cmd, err := f.wf.CmdFactory(nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ if cmd == nil {
+ errCh <- errors.E(op, "command is nil")
+ return errCh
+ }
+
+ // test worker creation
+ _, err = f.wf.NewWorker(context.Background(), nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ r := roadrunner.Payload{
+ Context: nil,
+ Body: []byte("test"),
+ }
+ rsp, err := f.pool.Exec(r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ if string(rsp.Body) != "test" {
+ errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body))
+ return errCh
+ }
+
+ return errCh
+}
+
+func (f *Foo) Stop() error {
+ f.pool.Destroy(context.Background())
+ return nil
+}
diff --git a/plugins/app/tests/plugin_1.go b/plugins/app/tests/plugin_1.go
deleted file mode 100644
index c6287f5c..00000000
--- a/plugins/app/tests/plugin_1.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package tests
-
-import (
- "errors"
- "fmt"
-
- "github.com/spiral/roadrunner/v2/plugins/app"
- "github.com/spiral/roadrunner/v2/plugins/config"
-)
-
-type Foo struct {
- configProvider config.Configurer
- spawner app.WorkerFactory
-}
-
-func (f *Foo) Init(p config.Configurer, spw app.WorkerFactory) error {
- f.configProvider = p
- f.spawner = spw
- return nil
-}
-
-func (f *Foo) Serve() chan error {
- errCh := make(chan error, 1)
-
- r := &app.Config{}
- err := f.configProvider.UnmarshalKey("app", r)
- if err != nil {
- errCh <- err
- return errCh
- }
-
- cmd, err := f.spawner.CmdFactory(nil)
- if err != nil {
- errCh <- err
- return errCh
- }
- if cmd == nil {
- errCh <- errors.New("command is nil")
- return errCh
- }
- a := cmd()
- out, err := a.Output()
- if err != nil {
- errCh <- err
- return errCh
- }
-
- fmt.Println(string(out))
-
- return errCh
-}
-
-func (f *Foo) Stop() error {
- return nil
-}
diff --git a/plugins/app/tests/plugin_2.go b/plugins/app/tests/plugin_2.go
deleted file mode 100644
index 51fb83a4..00000000
--- a/plugins/app/tests/plugin_2.go
+++ /dev/null
@@ -1,88 +0,0 @@
-package tests
-
-import (
- "context"
- "errors"
- "fmt"
- "time"
-
- "github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/plugins/app"
- "github.com/spiral/roadrunner/v2/plugins/config"
-)
-
-type Foo2 struct {
- configProvider config.Configurer
- wf app.WorkerFactory
-}
-
-func (f *Foo2) Init(p config.Configurer, workerFactory app.WorkerFactory) error {
- f.configProvider = p
- f.wf = workerFactory
- return nil
-}
-
-func (f *Foo2) Serve() chan error {
- errCh := make(chan error, 1)
-
- r := &app.Config{}
- err := f.configProvider.UnmarshalKey("app", r)
- if err != nil {
- errCh <- err
- return errCh
- }
-
- cmd, err := f.wf.CmdFactory(nil)
- if err != nil {
- errCh <- err
- return errCh
- }
- if cmd == nil {
- errCh <- errors.New("command is nil")
- return errCh
- }
- a := cmd()
- out, err := a.Output()
- if err != nil {
- errCh <- err
- return errCh
- }
-
- w, err := f.wf.NewWorker(context.Background(), nil)
- if err != nil {
- errCh <- err
- return errCh
- }
-
- _ = w
-
- poolConfig := roadrunner.Config{
- NumWorkers: 10,
- MaxJobs: 100,
- AllocateTimeout: time.Second * 10,
- DestroyTimeout: time.Second * 10,
- Supervisor: &roadrunner.SupervisorConfig{
- WatchTick: 60,
- TTL: 1000,
- IdleTTL: 10,
- ExecTTL: 10,
- MaxWorkerMemory: 1000,
- },
- }
-
- pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil)
- if err != nil {
- errCh <- err
- return errCh
- }
-
- _ = pool
-
- fmt.Println(string(out))
-
- return errCh
-}
-
-func (f *Foo2) Stop() error {
- return nil
-}
diff --git a/plugins/logger/encoder.go b/plugins/logger/encoder.go
index 66cd84f1..4ff583c4 100644
--- a/plugins/logger/encoder.go
+++ b/plugins/logger/encoder.go
@@ -61,6 +61,6 @@ func UTCTimeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
// returns string hash
func stringHash(name string, base int) int {
h := fnv.New32a()
- h.Write([]byte(name))
+ _, _ = h.Write([]byte(name))
return int(h.Sum32()) % base
}
diff --git a/plugins/logger/zap_logger.go b/plugins/logger/plugin.go
index a22cdb22..f05d0ff0 100644
--- a/plugins/logger/zap_logger.go
+++ b/plugins/logger/plugin.go
@@ -2,6 +2,7 @@ package logger
import (
"github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/log"
"github.com/spiral/roadrunner/v2/plugins/config"
"go.uber.org/zap"
)
@@ -9,14 +10,6 @@ import (
// ServiceName declares service name.
const ServiceName = "logs"
-type LogFactory interface {
- // GlobalLogger returns global log instance.
- GlobalLogger() *zap.Logger
-
- // NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
- NamedLogger(name string) *zap.Logger
-}
-
// ZapLogger manages zap logger.
type ZapLogger struct {
base *zap.Logger
@@ -25,8 +18,8 @@ type ZapLogger struct {
}
// Init logger service.
-func (z *ZapLogger) Init(cfg config.Configurer) (err error) {
- err = cfg.UnmarshalKey(ServiceName, &z.cfg)
+func (z *ZapLogger) Init(cfg config.Configurer) error {
+ err := cfg.UnmarshalKey(ServiceName, &z.cfg)
if err != nil {
return err
}
@@ -41,28 +34,32 @@ func (z *ZapLogger) Init(cfg config.Configurer) (err error) {
}
// DefaultLogger returns default logger.
-func (z *ZapLogger) DefaultLogger() (*zap.Logger, error) {
- return z.base, nil
+func (z *ZapLogger) DefaultLogger() (log.Logger, error) {
+ return log.NewZapAdapter(z.base), nil
}
// NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
-func (z *ZapLogger) NamedLogger(name string) (*zap.Logger, error) {
+func (z *ZapLogger) NamedLogger(name string) (log.Logger, error) {
if cfg, ok := z.channels.Channels[name]; ok {
- return cfg.BuildLogger()
+ l, err := cfg.BuildLogger()
+ if err != nil {
+ return nil, err
+ }
+ return log.NewZapAdapter(l), nil
}
- return z.base.Named(name), nil
+ return log.NewZapAdapter(z.base.Named(name)), nil
}
// NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
-func (z *ZapLogger) ServiceLogger(n endure.Named) (*zap.Logger, error) {
+func (z *ZapLogger) ServiceLogger(n endure.Named) (log.Logger, error) {
return z.NamedLogger(n.Name())
}
// Provides declares factory methods.
func (z *ZapLogger) Provides() []interface{} {
return []interface{}{
- z.DefaultLogger,
z.ServiceLogger,
+ z.DefaultLogger,
}
}
diff --git a/plugins/logger/tests/plugin1.go b/plugins/logger/tests/plugin1.go
index ca8701d2..c4ca1371 100644
--- a/plugins/logger/tests/plugin1.go
+++ b/plugins/logger/tests/plugin1.go
@@ -1 +1,26 @@
package tests
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type Plugin1 struct {
+ config config.Configurer
+ log *logger.ZapLogger
+}
+
+func (p1 *Plugin1) Init(cfg config.Configurer, log *logger.ZapLogger) error {
+ p1.config = cfg
+ p1.log = log
+ return nil
+}
+
+func (p1 *Plugin1) Serve() chan error {
+ errCh := make(chan error, 1)
+ return errCh
+}
+
+func (p1 *Plugin1) Stop() error {
+ return nil
+}
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/plugin.go
index 5203fc65..6401c0e2 100755
--- a/plugins/rpc/rpc.go
+++ b/plugins/rpc/plugin.go
@@ -5,11 +5,10 @@ import (
"net/rpc"
"sync/atomic"
- "go.uber.org/zap"
-
"github.com/spiral/endure"
"github.com/spiral/errors"
"github.com/spiral/goridge/v2"
+ "github.com/spiral/roadrunner/v2/log"
"github.com/spiral/roadrunner/v2/plugins/config"
)
@@ -22,12 +21,12 @@ type Pluggable interface {
}
// ServiceName contains default service name.
-const ServiceName = "rpc"
+const ServiceName = "RPC"
// Plugin is RPC service.
type Plugin struct {
cfg Config
- log *zap.Logger
+ log log.Logger
rpc *rpc.Server
services []Pluggable
listener net.Listener
@@ -35,7 +34,7 @@ type Plugin struct {
}
// Init rpc service. Must return true if service is enabled.
-func (s *Plugin) Init(cfg config.Configurer, log *zap.Logger) error {
+func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error {
const op = errors.Op("RPC Init")
if !cfg.Has(ServiceName) {
return errors.E(op, errors.Disabled)
@@ -92,7 +91,7 @@ func (s *Plugin) Serve() chan error {
return errCh
}
- s.log.Debug("Started RPC service", zap.String("address", s.cfg.Listen), zap.Any("services", services))
+ s.log.Debug("Started RPC service", "address", s.cfg.Listen, "services", services)
go func() {
for {
@@ -100,11 +99,11 @@ func (s *Plugin) Serve() chan error {
if err != nil {
if atomic.LoadUint32(s.closed) == 1 {
// just log and continue, this is not a critical issue, we just called Stop
- s.log.Error("listener accept error, connection closed", zap.Error(err))
+ s.log.Error("listener accept error, connection closed", "error", err)
return
}
- s.log.Error("listener accept error", zap.Error(err))
+ s.log.Error("listener accept error", "error", err)
errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err)
return
}
diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go
index 2a0ae1a8..854bf097 100644
--- a/plugins/rpc/tests/plugin2.go
+++ b/plugins/rpc/tests/plugin2.go
@@ -1,7 +1,6 @@
package tests
import (
- "fmt"
"net"
"net/rpc"
"time"
@@ -38,8 +37,6 @@ func (p2 *Plugin2) Serve() chan error {
errCh <- err
return
}
- fmt.Println("--------------")
- fmt.Println(ret)
if ret != "Hello, username: Valery" {
errCh <- errors.E("wrong response")
return