diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/app/plugin.go | 11 | ||||
-rw-r--r-- | plugins/app/tests/.rr.yaml | 2 | ||||
-rw-r--r-- | plugins/app/tests/factory_test.go | 10 | ||||
-rw-r--r-- | plugins/app/tests/hello.php | 3 | ||||
-rw-r--r-- | plugins/app/tests/plugin.go | 98 | ||||
-rw-r--r-- | plugins/app/tests/plugin_1.go | 55 | ||||
-rw-r--r-- | plugins/app/tests/plugin_2.go | 88 | ||||
-rw-r--r-- | plugins/logger/encoder.go | 2 | ||||
-rw-r--r-- | plugins/logger/plugin.go (renamed from plugins/logger/zap_logger.go) | 31 | ||||
-rw-r--r-- | plugins/logger/tests/plugin1.go | 25 | ||||
-rwxr-xr-x | plugins/rpc/plugin.go (renamed from plugins/rpc/rpc.go) | 15 | ||||
-rw-r--r-- | plugins/rpc/tests/plugin2.go | 3 |
12 files changed, 156 insertions, 187 deletions
diff --git a/plugins/app/plugin.go b/plugins/app/plugin.go index 2ef851e8..839685bd 100644 --- a/plugins/app/plugin.go +++ b/plugins/app/plugin.go @@ -7,10 +7,9 @@ import ( "os/exec" "strings" - "go.uber.org/zap" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/log" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" ) @@ -29,12 +28,12 @@ type WorkerFactory interface { // Plugin manages worker type Plugin struct { cfg Config - log *zap.Logger + log log.Logger factory roadrunner.Factory } // Init application provider. -func (app *Plugin) Init(cfg config.Configurer, log *zap.Logger) error { +func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error { err := cfg.UnmarshalKey(ServiceName, &app.cfg) if err != nil { return err @@ -170,9 +169,9 @@ func (app *Plugin) collectLogs(event interface{}) { if we, ok := event.(roadrunner.WorkerEvent); ok { switch we.Event { case roadrunner.EventWorkerError: - app.log.Error(we.Payload.(error).Error(), zap.Int64("pid", we.Worker.Pid())) + app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) case roadrunner.EventWorkerLog: - app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), zap.Int64("pid", we.Worker.Pid())) + app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) } } } diff --git a/plugins/app/tests/.rr.yaml b/plugins/app/tests/.rr.yaml index c9d1bd40..171f51dc 100644 --- a/plugins/app/tests/.rr.yaml +++ b/plugins/app/tests/.rr.yaml @@ -1,5 +1,5 @@ app: - command: "php plugins/app/tests/hello.php hello_echo" + command: "php hello.php" user: "" group: "" env: diff --git a/plugins/app/tests/factory_test.go b/plugins/app/tests/factory_test.go index 969c361c..878050a8 100644 --- a/plugins/app/tests/factory_test.go +++ b/plugins/app/tests/factory_test.go @@ -20,7 +20,7 @@ func TestFactory(t *testing.T) { } // config plugin vp := &config.Viper{} - vp.Path = "plugins/app/tests/.rr.yaml" + vp.Path = ".rr.yaml" vp.Prefix = "rr" err = container.Register(vp) if err != nil { @@ -37,11 +37,6 @@ func TestFactory(t *testing.T) { t.Fatal(err) } - err = container.Register(&Foo2{}) - if err != nil { - t.Fatal(err) - } - err = container.Register(&logger.ZapLogger{}) if err != nil { t.Fatal(err) @@ -61,7 +56,8 @@ func TestFactory(t *testing.T) { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) - tt := time.NewTicker(time.Second * 200) + // stop after 10 seconds + tt := time.NewTicker(time.Second * 10) for { select { diff --git a/plugins/app/tests/hello.php b/plugins/app/tests/hello.php index 4219dcf4..2591b77f 100644 --- a/plugins/app/tests/hello.php +++ b/plugins/app/tests/hello.php @@ -6,8 +6,9 @@ use Spiral\Goridge; use Spiral\RoadRunner; -require "/vendor_php/autoload.php"; +require dirname(__DIR__) . "/../../vendor_php/autoload.php"; +$relay = new Goridge\StreamRelay(STDIN, STDOUT); $rr = new RoadRunner\Worker($relay); while ($in = $rr->receive($ctx)) { diff --git a/plugins/app/tests/plugin.go b/plugins/app/tests/plugin.go new file mode 100644 index 00000000..e37aca01 --- /dev/null +++ b/plugins/app/tests/plugin.go @@ -0,0 +1,98 @@ +package tests + +import ( + "context" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/plugins/app" + "github.com/spiral/roadrunner/v2/plugins/config" +) + +const ConfigSection = "app" + +var testPoolConfig = roadrunner.Config{ + NumWorkers: 10, + MaxJobs: 100, + AllocateTimeout: time.Second * 10, + DestroyTimeout: time.Second * 10, + Supervisor: &roadrunner.SupervisorConfig{ + WatchTick: 60, + TTL: 1000, + IdleTTL: 10, + ExecTTL: 10, + MaxWorkerMemory: 1000, + }, +} + +type Foo struct { + configProvider config.Configurer + wf app.WorkerFactory + pool roadrunner.Pool +} + +func (f *Foo) Init(p config.Configurer, workerFactory app.WorkerFactory) error { + f.configProvider = p + f.wf = workerFactory + return nil +} + +func (f *Foo) Serve() chan error { + const op = errors.Op("serve") + errCh := make(chan error, 1) + + conf := &app.Config{} + var err error + err = f.configProvider.UnmarshalKey(ConfigSection, conf) + if err != nil { + errCh <- err + return errCh + } + + // test CMDFactory + cmd, err := f.wf.CmdFactory(nil) + if err != nil { + errCh <- err + return errCh + } + if cmd == nil { + errCh <- errors.E(op, "command is nil") + return errCh + } + + // test worker creation + _, err = f.wf.NewWorker(context.Background(), nil) + if err != nil { + errCh <- err + return errCh + } + + f.pool, err = f.wf.NewWorkerPool(context.Background(), testPoolConfig, nil) + if err != nil { + errCh <- err + return errCh + } + + r := roadrunner.Payload{ + Context: nil, + Body: []byte("test"), + } + rsp, err := f.pool.Exec(r) + if err != nil { + errCh <- err + return errCh + } + + if string(rsp.Body) != "test" { + errCh <- errors.E("response from worker is wrong", errors.Errorf("response: %s", rsp.Body)) + return errCh + } + + return errCh +} + +func (f *Foo) Stop() error { + f.pool.Destroy(context.Background()) + return nil +} diff --git a/plugins/app/tests/plugin_1.go b/plugins/app/tests/plugin_1.go deleted file mode 100644 index c6287f5c..00000000 --- a/plugins/app/tests/plugin_1.go +++ /dev/null @@ -1,55 +0,0 @@ -package tests - -import ( - "errors" - "fmt" - - "github.com/spiral/roadrunner/v2/plugins/app" - "github.com/spiral/roadrunner/v2/plugins/config" -) - -type Foo struct { - configProvider config.Configurer - spawner app.WorkerFactory -} - -func (f *Foo) Init(p config.Configurer, spw app.WorkerFactory) error { - f.configProvider = p - f.spawner = spw - return nil -} - -func (f *Foo) Serve() chan error { - errCh := make(chan error, 1) - - r := &app.Config{} - err := f.configProvider.UnmarshalKey("app", r) - if err != nil { - errCh <- err - return errCh - } - - cmd, err := f.spawner.CmdFactory(nil) - if err != nil { - errCh <- err - return errCh - } - if cmd == nil { - errCh <- errors.New("command is nil") - return errCh - } - a := cmd() - out, err := a.Output() - if err != nil { - errCh <- err - return errCh - } - - fmt.Println(string(out)) - - return errCh -} - -func (f *Foo) Stop() error { - return nil -} diff --git a/plugins/app/tests/plugin_2.go b/plugins/app/tests/plugin_2.go deleted file mode 100644 index 51fb83a4..00000000 --- a/plugins/app/tests/plugin_2.go +++ /dev/null @@ -1,88 +0,0 @@ -package tests - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/app" - "github.com/spiral/roadrunner/v2/plugins/config" -) - -type Foo2 struct { - configProvider config.Configurer - wf app.WorkerFactory -} - -func (f *Foo2) Init(p config.Configurer, workerFactory app.WorkerFactory) error { - f.configProvider = p - f.wf = workerFactory - return nil -} - -func (f *Foo2) Serve() chan error { - errCh := make(chan error, 1) - - r := &app.Config{} - err := f.configProvider.UnmarshalKey("app", r) - if err != nil { - errCh <- err - return errCh - } - - cmd, err := f.wf.CmdFactory(nil) - if err != nil { - errCh <- err - return errCh - } - if cmd == nil { - errCh <- errors.New("command is nil") - return errCh - } - a := cmd() - out, err := a.Output() - if err != nil { - errCh <- err - return errCh - } - - w, err := f.wf.NewWorker(context.Background(), nil) - if err != nil { - errCh <- err - return errCh - } - - _ = w - - poolConfig := roadrunner.Config{ - NumWorkers: 10, - MaxJobs: 100, - AllocateTimeout: time.Second * 10, - DestroyTimeout: time.Second * 10, - Supervisor: &roadrunner.SupervisorConfig{ - WatchTick: 60, - TTL: 1000, - IdleTTL: 10, - ExecTTL: 10, - MaxWorkerMemory: 1000, - }, - } - - pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil) - if err != nil { - errCh <- err - return errCh - } - - _ = pool - - fmt.Println(string(out)) - - return errCh -} - -func (f *Foo2) Stop() error { - return nil -} diff --git a/plugins/logger/encoder.go b/plugins/logger/encoder.go index 66cd84f1..4ff583c4 100644 --- a/plugins/logger/encoder.go +++ b/plugins/logger/encoder.go @@ -61,6 +61,6 @@ func UTCTimeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) { // returns string hash func stringHash(name string, base int) int { h := fnv.New32a() - h.Write([]byte(name)) + _, _ = h.Write([]byte(name)) return int(h.Sum32()) % base } diff --git a/plugins/logger/zap_logger.go b/plugins/logger/plugin.go index a22cdb22..f05d0ff0 100644 --- a/plugins/logger/zap_logger.go +++ b/plugins/logger/plugin.go @@ -2,6 +2,7 @@ package logger import ( "github.com/spiral/endure" + "github.com/spiral/roadrunner/v2/log" "github.com/spiral/roadrunner/v2/plugins/config" "go.uber.org/zap" ) @@ -9,14 +10,6 @@ import ( // ServiceName declares service name. const ServiceName = "logs" -type LogFactory interface { - // GlobalLogger returns global log instance. - GlobalLogger() *zap.Logger - - // NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params. - NamedLogger(name string) *zap.Logger -} - // ZapLogger manages zap logger. type ZapLogger struct { base *zap.Logger @@ -25,8 +18,8 @@ type ZapLogger struct { } // Init logger service. -func (z *ZapLogger) Init(cfg config.Configurer) (err error) { - err = cfg.UnmarshalKey(ServiceName, &z.cfg) +func (z *ZapLogger) Init(cfg config.Configurer) error { + err := cfg.UnmarshalKey(ServiceName, &z.cfg) if err != nil { return err } @@ -41,28 +34,32 @@ func (z *ZapLogger) Init(cfg config.Configurer) (err error) { } // DefaultLogger returns default logger. -func (z *ZapLogger) DefaultLogger() (*zap.Logger, error) { - return z.base, nil +func (z *ZapLogger) DefaultLogger() (log.Logger, error) { + return log.NewZapAdapter(z.base), nil } // NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params. -func (z *ZapLogger) NamedLogger(name string) (*zap.Logger, error) { +func (z *ZapLogger) NamedLogger(name string) (log.Logger, error) { if cfg, ok := z.channels.Channels[name]; ok { - return cfg.BuildLogger() + l, err := cfg.BuildLogger() + if err != nil { + return nil, err + } + return log.NewZapAdapter(l), nil } - return z.base.Named(name), nil + return log.NewZapAdapter(z.base.Named(name)), nil } // NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params. -func (z *ZapLogger) ServiceLogger(n endure.Named) (*zap.Logger, error) { +func (z *ZapLogger) ServiceLogger(n endure.Named) (log.Logger, error) { return z.NamedLogger(n.Name()) } // Provides declares factory methods. func (z *ZapLogger) Provides() []interface{} { return []interface{}{ - z.DefaultLogger, z.ServiceLogger, + z.DefaultLogger, } } diff --git a/plugins/logger/tests/plugin1.go b/plugins/logger/tests/plugin1.go index ca8701d2..c4ca1371 100644 --- a/plugins/logger/tests/plugin1.go +++ b/plugins/logger/tests/plugin1.go @@ -1 +1,26 @@ package tests + +import ( + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type Plugin1 struct { + config config.Configurer + log *logger.ZapLogger +} + +func (p1 *Plugin1) Init(cfg config.Configurer, log *logger.ZapLogger) error { + p1.config = cfg + p1.log = log + return nil +} + +func (p1 *Plugin1) Serve() chan error { + errCh := make(chan error, 1) + return errCh +} + +func (p1 *Plugin1) Stop() error { + return nil +} diff --git a/plugins/rpc/rpc.go b/plugins/rpc/plugin.go index 5203fc65..6401c0e2 100755 --- a/plugins/rpc/rpc.go +++ b/plugins/rpc/plugin.go @@ -5,11 +5,10 @@ import ( "net/rpc" "sync/atomic" - "go.uber.org/zap" - "github.com/spiral/endure" "github.com/spiral/errors" "github.com/spiral/goridge/v2" + "github.com/spiral/roadrunner/v2/log" "github.com/spiral/roadrunner/v2/plugins/config" ) @@ -22,12 +21,12 @@ type Pluggable interface { } // ServiceName contains default service name. -const ServiceName = "rpc" +const ServiceName = "RPC" // Plugin is RPC service. type Plugin struct { cfg Config - log *zap.Logger + log log.Logger rpc *rpc.Server services []Pluggable listener net.Listener @@ -35,7 +34,7 @@ type Plugin struct { } // Init rpc service. Must return true if service is enabled. -func (s *Plugin) Init(cfg config.Configurer, log *zap.Logger) error { +func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error { const op = errors.Op("RPC Init") if !cfg.Has(ServiceName) { return errors.E(op, errors.Disabled) @@ -92,7 +91,7 @@ func (s *Plugin) Serve() chan error { return errCh } - s.log.Debug("Started RPC service", zap.String("address", s.cfg.Listen), zap.Any("services", services)) + s.log.Debug("Started RPC service", "address", s.cfg.Listen, "services", services) go func() { for { @@ -100,11 +99,11 @@ func (s *Plugin) Serve() chan error { if err != nil { if atomic.LoadUint32(s.closed) == 1 { // just log and continue, this is not a critical issue, we just called Stop - s.log.Error("listener accept error, connection closed", zap.Error(err)) + s.log.Error("listener accept error, connection closed", "error", err) return } - s.log.Error("listener accept error", zap.Error(err)) + s.log.Error("listener accept error", "error", err) errCh <- errors.E(errors.Op("listener accept"), errors.Serve, err) return } diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go index 2a0ae1a8..854bf097 100644 --- a/plugins/rpc/tests/plugin2.go +++ b/plugins/rpc/tests/plugin2.go @@ -1,7 +1,6 @@ package tests import ( - "fmt" "net" "net/rpc" "time" @@ -38,8 +37,6 @@ func (p2 *Plugin2) Serve() chan error { errCh <- err return } - fmt.Println("--------------") - fmt.Println(ret) if ret != "Hello, username: Valery" { errCh <- errors.E("wrong response") return |