summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--[-rwxr-xr-x]plugins/app/app.go (renamed from plugins/factory/app.go)25
-rw-r--r--[-rwxr-xr-x]plugins/app/config.go (renamed from plugins/factory/config.go)2
-rw-r--r--[-rwxr-xr-x]plugins/app/tests/.rr.yaml (renamed from plugins/factory/tests/.rr.yaml)0
-rw-r--r--[-rwxr-xr-x]plugins/app/tests/factory_test.go (renamed from plugins/factory/tests/factory_test.go)4
-rw-r--r--[-rwxr-xr-x]plugins/app/tests/hello.php (renamed from plugins/factory/tests/hello.php)0
-rw-r--r--[-rwxr-xr-x]plugins/app/tests/plugin_1.go (renamed from plugins/factory/tests/plugin_1.go)10
-rw-r--r--[-rwxr-xr-x]plugins/app/tests/plugin_2.go (renamed from plugins/factory/tests/plugin_2.go)10
-rw-r--r--plugins/logger/config.go94
-rw-r--r--plugins/logger/encoder.go66
-rw-r--r--plugins/logger/zap_logger.go68
-rwxr-xr-xplugins/rpc/rpc.go68
11 files changed, 295 insertions, 52 deletions
diff --git a/plugins/factory/app.go b/plugins/app/app.go
index 4951e3df..ebb42631 100755..100644
--- a/plugins/factory/app.go
+++ b/plugins/app/app.go
@@ -1,8 +1,9 @@
-package factory
+package app
import (
"context"
"fmt"
+ "go.uber.org/zap"
"log"
"os"
"os/exec"
@@ -19,9 +20,9 @@ const ServiceName = "app"
type Env map[string]string
-// AppFactory creates workers for the application.
-type AppFactory interface {
- NewCmdFactory(env Env) (func() *exec.Cmd, error)
+// WorkerFactory creates workers for the application.
+type WorkerFactory interface {
+ CmdFactory(env Env) (func() *exec.Cmd, error)
NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error)
}
@@ -29,16 +30,18 @@ type AppFactory interface {
// App manages worker
type App struct {
cfg Config
+ log *zap.Logger
factory roadrunner.Factory
}
// Init application provider.
-func (app *App) Init(cfg config.Provider) error {
+func (app *App) Init(cfg config.Provider, log *zap.Logger) error {
err := cfg.UnmarshalKey(ServiceName, &app.cfg)
if err != nil {
return err
}
app.cfg.InitDefaults()
+ app.log = log
return nil
}
@@ -57,6 +60,8 @@ func (app *App) Serve() chan error {
errCh <- errors.E(errors.Op("init factory"), err)
}
+ app.log.Info("Started worker factory", zap.Any("relay", app.cfg.Relay), zap.Any("command", app.cfg.Command))
+
return errCh
}
@@ -68,7 +73,8 @@ func (app *App) Stop() error {
return app.factory.Close(context.Background())
}
-func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) {
+// CmdFactory provides worker command factory assocated with given context.
+func (app *App) CmdFactory(env Env) (func() *exec.Cmd, error) {
var cmdArgs []string
// create command according to the config
@@ -93,8 +99,9 @@ func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) {
}, nil
}
+// NewWorker issues new standalone worker.
func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
- spawnCmd, err := app.NewCmdFactory(env)
+ spawnCmd, err := app.CmdFactory(env)
if err != nil {
return nil, err
}
@@ -102,8 +109,9 @@ func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase,
return app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
}
+// NewWorkerPool issues new worker pool.
func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) {
- spawnCmd, err := app.NewCmdFactory(env)
+ spawnCmd, err := app.CmdFactory(env)
if err != nil {
return nil, err
}
@@ -124,6 +132,7 @@ func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env En
return p, nil
}
+// creates relay and worker factory.
func (app *App) initFactory() (roadrunner.Factory, error) {
if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
diff --git a/plugins/factory/config.go b/plugins/app/config.go
index b2d1d0ad..eaa54e2d 100755..100644
--- a/plugins/factory/config.go
+++ b/plugins/app/config.go
@@ -1,4 +1,4 @@
-package factory
+package app
import "time"
diff --git a/plugins/factory/tests/.rr.yaml b/plugins/app/tests/.rr.yaml
index 171f51dc..171f51dc 100755..100644
--- a/plugins/factory/tests/.rr.yaml
+++ b/plugins/app/tests/.rr.yaml
diff --git a/plugins/factory/tests/factory_test.go b/plugins/app/tests/factory_test.go
index 6c264fd6..7c885797 100755..100644
--- a/plugins/factory/tests/factory_test.go
+++ b/plugins/app/tests/factory_test.go
@@ -7,8 +7,8 @@ import (
"time"
"github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/plugins/app"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/factory"
"github.com/stretchr/testify/assert"
)
@@ -26,7 +26,7 @@ func TestFactory(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&factory.App{})
+ err = container.Register(&app.App{})
if err != nil {
t.Fatal(err)
}
diff --git a/plugins/factory/tests/hello.php b/plugins/app/tests/hello.php
index bf9e82cc..bf9e82cc 100755..100644
--- a/plugins/factory/tests/hello.php
+++ b/plugins/app/tests/hello.php
diff --git a/plugins/factory/tests/plugin_1.go b/plugins/app/tests/plugin_1.go
index 9011bb00..7259ea9d 100755..100644
--- a/plugins/factory/tests/plugin_1.go
+++ b/plugins/app/tests/plugin_1.go
@@ -4,16 +4,16 @@ import (
"errors"
"fmt"
+ "github.com/spiral/roadrunner/v2/plugins/app"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/factory"
)
type Foo struct {
configProvider config.Provider
- spawner factory.AppFactory
+ spawner app.WorkerFactory
}
-func (f *Foo) Init(p config.Provider, spw factory.AppFactory) error {
+func (f *Foo) Init(p config.Provider, spw app.WorkerFactory) error {
f.configProvider = p
f.spawner = spw
return nil
@@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.AppFactory) error {
func (f *Foo) Serve() chan error {
errCh := make(chan error, 1)
- r := &factory.Config{}
+ r := &app.Config{}
err := f.configProvider.UnmarshalKey("app", r)
if err != nil {
errCh <- err
return errCh
}
- cmd, err := f.spawner.NewCmdFactory(nil)
+ cmd, err := f.spawner.CmdFactory(nil)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/factory/tests/plugin_2.go b/plugins/app/tests/plugin_2.go
index 2311b7bf..fbb9ca11 100755..100644
--- a/plugins/factory/tests/plugin_2.go
+++ b/plugins/app/tests/plugin_2.go
@@ -7,16 +7,16 @@ import (
"time"
"github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/plugins/app"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/factory"
)
type Foo2 struct {
configProvider config.Provider
- wf factory.AppFactory
+ wf app.WorkerFactory
}
-func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory) error {
+func (f *Foo2) Init(p config.Provider, workerFactory app.WorkerFactory) error {
f.configProvider = p
f.wf = workerFactory
return nil
@@ -25,14 +25,14 @@ func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory) error {
func (f *Foo2) Serve() chan error {
errCh := make(chan error, 1)
- r := &factory.Config{}
+ r := &app.Config{}
err := f.configProvider.UnmarshalKey("app", r)
if err != nil {
errCh <- err
return errCh
}
- cmd, err := f.wf.NewCmdFactory(nil)
+ cmd, err := f.wf.CmdFactory(nil)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/logger/config.go b/plugins/logger/config.go
new file mode 100644
index 00000000..f7a5742c
--- /dev/null
+++ b/plugins/logger/config.go
@@ -0,0 +1,94 @@
+package logger
+
+import (
+ "strings"
+
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+)
+
+// ChannelConfig configures loggers per channel.
+type ChannelConfig struct {
+ // Dedicated channels per logger. By default logger allocated via named logger.
+ Channels map[string]Config `json:"channels" yaml:"channels"`
+}
+
+type Config struct {
+ // Mode configures logger based on some default template (development, production, off).
+ Mode string `json:"mode" yaml:"mode"`
+
+ // Level is the minimum enabled logging level. Note that this is a dynamic
+ // level, so calling ChannelConfig.Level.SetLevel will atomically change the log
+ // level of all loggers descended from this config.
+ Level string `json:"level" yaml:"level"`
+
+ // Encoding sets the logger's encoding. Valid values are "json" and
+ // "console", as well as any third-party encodings registered via
+ // RegisterEncoder.
+ Encoding string `json:"encoding" yaml:"encoding"`
+
+ // Output is a list of URLs or file paths to write logging output to.
+ // See Open for details.
+ Output []string `json:"output" yaml:"output"`
+
+ // ErrorOutput is a list of URLs to write internal logger errors to.
+ // The default is standard error.
+ //
+ // Note that this setting only affects internal errors; for sample code that
+ // sends error-level logs to a different location from info- and debug-level
+ // logs, see the package-level AdvancedConfiguration example.
+ ErrorOutput []string `json:"errorOutput" yaml:"errorOutput"`
+}
+
+// ZapConfig converts config into Zap configuration.
+func (cfg *Config) BuildLogger() (*zap.Logger, error) {
+ var zCfg zap.Config
+ switch strings.ToLower(cfg.Mode) {
+ case "off", "none":
+ return zap.NewNop(), nil
+ case "production":
+ zCfg = zap.NewProductionConfig()
+ case "development":
+ zCfg = zap.NewDevelopmentConfig()
+ default:
+ zCfg = zap.Config{
+ Level: zap.NewAtomicLevelAt(zap.DebugLevel),
+ Encoding: "console",
+ EncoderConfig: zapcore.EncoderConfig{
+ MessageKey: "message",
+ LevelKey: "level",
+ TimeKey: "time",
+ NameKey: "name",
+ EncodeName: ColoredHashedNameEncoder,
+ EncodeLevel: ColoredLevelEncoder,
+ EncodeTime: UTCTimeEncoder,
+ EncodeCaller: zapcore.ShortCallerEncoder,
+ },
+ OutputPaths: []string{"stderr"},
+ ErrorOutputPaths: []string{"stderr"},
+ }
+ }
+
+ if cfg.Level != "" {
+ level := zap.NewAtomicLevel()
+ if err := level.UnmarshalText([]byte(cfg.Level)); err == nil {
+ zCfg.Level = level
+ }
+ }
+
+ if cfg.Encoding != "" {
+ zCfg.Encoding = cfg.Encoding
+ }
+
+ if len(cfg.Output) != 0 {
+ zCfg.OutputPaths = cfg.Output
+ }
+
+ if len(cfg.ErrorOutput) != 0 {
+ zCfg.ErrorOutputPaths = cfg.ErrorOutput
+ }
+
+ // todo: https://github.com/uber-go/zap/blob/master/FAQ.md#does-zap-support-log-rotation
+
+ return zCfg.Build()
+}
diff --git a/plugins/logger/encoder.go b/plugins/logger/encoder.go
new file mode 100644
index 00000000..66cd84f1
--- /dev/null
+++ b/plugins/logger/encoder.go
@@ -0,0 +1,66 @@
+package logger
+
+import (
+ "hash/fnv"
+ "strings"
+ "time"
+
+ "github.com/fatih/color"
+ "go.uber.org/zap/zapcore"
+)
+
+var colorMap = []func(string, ...interface{}) string{
+ color.HiYellowString,
+ color.HiGreenString,
+ color.HiBlueString,
+ color.HiRedString,
+ color.HiCyanString,
+ color.HiMagentaString,
+}
+
+// ColoredLevelEncoder colorizes log levels.
+func ColoredLevelEncoder(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) {
+ switch level {
+ case zapcore.DebugLevel:
+ enc.AppendString(color.HiWhiteString(level.CapitalString()))
+ case zapcore.InfoLevel:
+ enc.AppendString(color.HiCyanString(level.CapitalString()))
+ case zapcore.WarnLevel:
+ enc.AppendString(color.HiYellowString(level.CapitalString()))
+ case zapcore.ErrorLevel, zapcore.DPanicLevel:
+ enc.AppendString(color.HiRedString(level.CapitalString()))
+ case zapcore.PanicLevel, zapcore.FatalLevel:
+ enc.AppendString(color.HiMagentaString(level.CapitalString()))
+ }
+}
+
+// ColoredNameEncoder colorizes service names.
+func ColoredNameEncoder(s string, enc zapcore.PrimitiveArrayEncoder) {
+ if len(s) < 12 {
+ s += strings.Repeat(" ", 12-len(s))
+ }
+
+ enc.AppendString(color.HiGreenString(s))
+}
+
+// ColoredHashedNameEncoder colorizes service names and assigns different colors to different names.
+func ColoredHashedNameEncoder(s string, enc zapcore.PrimitiveArrayEncoder) {
+ if len(s) < 12 {
+ s += strings.Repeat(" ", 12-len(s))
+ }
+
+ colorID := stringHash(s, len(colorMap))
+ enc.AppendString(colorMap[colorID](s))
+}
+
+// UTCTimeEncoder encodes time into short UTC specific timestamp.
+func UTCTimeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
+ enc.AppendString(t.UTC().Format("2006/01/02 15:04:05"))
+}
+
+// returns string hash
+func stringHash(name string, base int) int {
+ h := fnv.New32a()
+ h.Write([]byte(name))
+ return int(h.Sum32()) % base
+}
diff --git a/plugins/logger/zap_logger.go b/plugins/logger/zap_logger.go
new file mode 100644
index 00000000..8c1739f2
--- /dev/null
+++ b/plugins/logger/zap_logger.go
@@ -0,0 +1,68 @@
+package logger
+
+import (
+ "github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "go.uber.org/zap"
+)
+
+// ServiceName declares service name.
+const ServiceName = "logs"
+
+type LogFactory interface {
+ // GlobalLogger returns global log instance.
+ GlobalLogger() *zap.Logger
+
+ // NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
+ NamedLogger(name string) *zap.Logger
+}
+
+// ZapLogger manages zap logger.
+type ZapLogger struct {
+ base *zap.Logger
+ cfg Config
+ channels ChannelConfig
+}
+
+// Init logger service.
+func (z *ZapLogger) Init(cfg config.Provider) (err error) {
+ err = cfg.UnmarshalKey(ServiceName, &z.cfg)
+ if err != nil {
+ return err
+ }
+
+ err = cfg.UnmarshalKey(ServiceName, &z.channels)
+ if err != nil {
+ return err
+ }
+
+ z.base, err = z.cfg.BuildLogger()
+ return err
+}
+
+// DefaultLogger returns default logger.
+func (z *ZapLogger) DefaultLogger() (*zap.Logger, error) {
+ return z.base, nil
+}
+
+// NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
+func (z *ZapLogger) NamedLogger(name string) (*zap.Logger, error) {
+ if cfg, ok := z.channels.Channels[name]; ok {
+ return cfg.BuildLogger()
+ }
+
+ return z.base.Named(name), nil
+}
+
+// NamedLogger returns logger dedicated to the specific channel. Similar to Named() but also reads the core params.
+func (z *ZapLogger) ServiceLogger(n endure.Named) (*zap.Logger, error) {
+ return z.NamedLogger(n.Name())
+}
+
+// Provides declares factory methods.
+func (z *ZapLogger) Provides() []interface{} {
+ return []interface{}{
+ z.DefaultLogger,
+ z.ServiceLogger,
+ }
+}
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go
index 0f6c9753..d272fc72 100755
--- a/plugins/rpc/rpc.go
+++ b/plugins/rpc/rpc.go
@@ -3,6 +3,8 @@ package rpc
import (
"net/rpc"
+ "go.uber.org/zap"
+
"github.com/spiral/endure"
"github.com/spiral/endure/errors"
"github.com/spiral/goridge/v2"
@@ -20,65 +22,70 @@ type RPCPluggable interface {
// ServiceName contains default service name.
const ServiceName = "rpc"
-type services struct {
- service interface{}
- name string
-}
-
// Service is RPC service.
type Service struct {
+ cfg Config
+ log *zap.Logger
rpc *rpc.Server
- services []services
- config Config
+ services []RPCPluggable
close chan struct{}
}
// Init rpc service. Must return true if service is enabled.
-func (s *Service) Init(cfg config.Provider) error {
+func (s *Service) Init(cfg config.Provider, log *zap.Logger) error {
if !cfg.Has(ServiceName) {
return errors.E(errors.Disabled)
}
- err := cfg.UnmarshalKey(ServiceName, &s.config)
+ err := cfg.UnmarshalKey(ServiceName, &s.cfg)
if err != nil {
return err
}
- s.config.InitDefaults()
+ s.cfg.InitDefaults()
- if s.config.Disabled {
+ if s.cfg.Disabled {
return errors.E(errors.Disabled)
}
- return s.config.Valid()
-}
+ s.log = log
-// Name contains service name.
-func (s *Service) Name() string {
- return ServiceName
+ return s.cfg.Valid()
}
// Serve serves the service.
func (s *Service) Serve() chan error {
- s.close = make(chan struct{}, 1)
errCh := make(chan error, 1)
+ s.close = make(chan struct{}, 1)
s.rpc = rpc.NewServer()
+ names := make([]string, 0, len(s.services))
+
// Attach all services
for i := 0; i < len(s.services); i++ {
- err := s.Register(s.services[i].name, s.services[i].service)
+ svc, err := s.services[i].RPCService()
+ if err != nil {
+ errCh <- errors.E(errors.Op("register service"), err)
+ return errCh
+ }
+
+ err = s.Register(s.services[i].Name(), svc)
if err != nil {
errCh <- errors.E(errors.Op("register service"), err)
return errCh
}
+
+ names = append(names, s.services[i].Name())
}
- ln, err := s.config.Listener()
+ ln, err := s.cfg.Listener()
if err != nil {
errCh <- err
return errCh
}
+ s.log.Debug("Started RPC service", zap.String("address", s.cfg.Listen), zap.Any("services", names))
+
go func() {
for {
select {
@@ -109,22 +116,21 @@ func (s *Service) Stop() error {
return nil
}
+// Name contains service name.
+func (s *Service) Name() string {
+ return ServiceName
+}
+
+// Depends declares services to collect for RPC.
func (s *Service) Depends() []interface{} {
return []interface{}{
- s.RegisterService,
+ s.RegisterPlugin,
}
}
-func (s *Service) RegisterService(p RPCPluggable) error {
- service, err := p.RPCService()
- if err != nil {
- return err
- }
-
- s.services = append(s.services, services{
- service: service,
- name: p.Name(),
- })
+// RegisterPlugin registers RPC service plugin.
+func (s *Service) RegisterPlugin(p RPCPluggable) error {
+ s.services = append(s.services, p)
return nil
}
@@ -146,7 +152,7 @@ func (s *Service) Register(name string, svc interface{}) error {
// Client creates new RPC client.
func (s *Service) Client() (*rpc.Client, error) {
- conn, err := s.config.Dialer()
+ conn, err := s.cfg.Dialer()
if err != nil {
return nil, err
}