summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.vscode/settings.json1
-rw-r--r--pkg/events/grpc_event.go39
-rw-r--r--plugins/grpc/config.go5
-rw-r--r--plugins/grpc/plugin.go143
-rw-r--r--plugins/grpc/proxy/proxy.go21
-rw-r--r--plugins/grpc/server.go36
-rw-r--r--tests/psr-worker-bench.php1
7 files changed, 224 insertions, 22 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 50bc942d..5fba80db 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -19,6 +19,7 @@
"mget",
"prefetch",
"proto",
+ "protobuf",
"SETEX",
"shivammathur",
"srem",
diff --git a/pkg/events/grpc_event.go b/pkg/events/grpc_event.go
new file mode 100644
index 00000000..31ff4957
--- /dev/null
+++ b/pkg/events/grpc_event.go
@@ -0,0 +1,39 @@
+package events
+
+import (
+ "time"
+
+ "google.golang.org/grpc"
+)
+
+const (
+ // EventUnaryCallOk represents success unary call response
+ EventUnaryCallOk G = iota + 13000
+
+ // EventUnaryCallErr raised when unary call ended with error
+ EventUnaryCallErr
+)
+
+type G int64
+
+func (ev G) String() string {
+ switch ev {
+ case EventUnaryCallOk:
+ return "EventUnaryCallOk"
+ case EventUnaryCallErr:
+ return "EventUnaryCallErr"
+ }
+ return UnknownEventType
+}
+
+// JobEvent represent job event.
+type GRPCEvent struct {
+ Event G
+ // Info contains unary call info.
+ Info *grpc.UnaryServerInfo
+ // Error associated with event.
+ Error error
+ // event timings
+ Start time.Time
+ Elapsed time.Duration
+}
diff --git a/plugins/grpc/config.go b/plugins/grpc/config.go
index 8a3af6a2..87bbf7ae 100644
--- a/plugins/grpc/config.go
+++ b/plugins/grpc/config.go
@@ -12,7 +12,10 @@ type Config struct {
TLS *TLS
- grpcPool pool.Config
+ // Env is environment variables passed to the http pool
+ Env map[string]string
+
+ GrpcPool pool.Config
MaxSendMsgSize int64 `mapstructure:"max_send_msg_size"`
MaxRecvMsgSize int64 `mapstructure:"max_recv_msg_size"`
MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle"`
diff --git a/plugins/grpc/plugin.go b/plugins/grpc/plugin.go
index 9285afe5..579a00a4 100644
--- a/plugins/grpc/plugin.go
+++ b/plugins/grpc/plugin.go
@@ -1,35 +1,66 @@
package grpc
import (
+ "context"
+ "sync"
+
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/grpc/codec"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/utils"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
)
const (
- name string = "grpc"
+ name string = "grpc"
+ RrGrpc string = "RR_GRPC"
)
type Plugin struct {
+ mu *sync.RWMutex
config *Config
gPool pool.Pool
opts []grpc.ServerOption
services []func(server *grpc.Server)
+ server *grpc.Server
+ rrServer server.Server
- cfg config.Configurer
- log logger.Logger
+ // events handler
+ events events.Handler
+ log logger.Logger
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
const op = errors.Op("grpc_plugin_init")
+ if !cfg.Has(name) {
+ return errors.E(errors.Disabled)
+ }
// register the codec
encoding.RegisterCodec(&codec.Codec{})
+ err := cfg.UnmarshalKey(name, p.config)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.opts = make([]grpc.ServerOption, 0)
+ p.services = make([]func(server *grpc.Server), 0)
+ p.events = events.NewEventsHandler()
+ p.events.AddListener(p.collectGRPCEvents)
+ p.rrServer = server
+
+ // worker's GRPC mode
+ p.config.Env[RrGrpc] = "true"
+
+ p.log = log
+
return nil
}
@@ -37,10 +68,61 @@ func (p *Plugin) Serve() chan error {
const op = errors.Op("grpc_plugin_serve")
errCh := make(chan error, 1)
+ var err error
+ p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{
+ Debug: p.config.GrpcPool.Debug,
+ NumWorkers: p.config.GrpcPool.NumWorkers,
+ MaxJobs: p.config.GrpcPool.MaxJobs,
+ AllocateTimeout: p.config.GrpcPool.AllocateTimeout,
+ DestroyTimeout: p.config.GrpcPool.DestroyTimeout,
+ Supervisor: p.config.GrpcPool.Supervisor,
+ }, p.config.Env, p.collectGRPCEvents)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ go func() {
+ var err error
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ p.server, err = p.createGRPCserver()
+ if err != nil {
+ p.log.Error("create grpc server", "error", err)
+ errCh <- errors.E(op, err)
+ return
+ }
+
+ l, err := utils.CreateListener(p.config.Listen)
+ if err != nil {
+ p.log.Error("create grpc listener", "error", err)
+ errCh <- errors.E(op, err)
+ }
+
+ // protect serve
+ err = p.server.Serve(l)
+ if err != nil {
+ // skip errors when stopping the server
+ if err == grpc.ErrServerStopped {
+ return
+ }
+
+ p.log.Error("grpc server stopped", "error", err)
+ errCh <- errors.E(op, err)
+ return
+ }
+ }()
+
return errCh
}
func (p *Plugin) Stop() error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.server != nil {
+ p.server.GracefulStop()
+ }
return nil
}
@@ -49,3 +131,56 @@ func (p *Plugin) Available() {}
func (p *Plugin) Name() string {
return name
}
+
+func (p *Plugin) Reset() error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ const op = errors.Op("grpc_plugin_reset")
+
+ // destroy old pool
+ p.gPool.Destroy(context.Background())
+
+ var err error
+ p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{
+ Debug: p.config.GrpcPool.Debug,
+ NumWorkers: p.config.GrpcPool.NumWorkers,
+ MaxJobs: p.config.GrpcPool.MaxJobs,
+ AllocateTimeout: p.config.GrpcPool.AllocateTimeout,
+ DestroyTimeout: p.config.GrpcPool.DestroyTimeout,
+ Supervisor: p.config.GrpcPool.Supervisor,
+ }, p.config.Env, p.collectGRPCEvents)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (p *Plugin) Workers() []*process.State {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ workers := p.gPool.Workers()
+
+ ps := make([]*process.State, 0, len(workers))
+ for i := 0; i < len(workers); i++ {
+ state, err := process.WorkerProcessState(workers[i])
+ if err != nil {
+ return nil
+ }
+ ps = append(ps, state)
+ }
+
+ return ps
+}
+
+func (p *Plugin) collectGRPCEvents(event interface{}) {
+ if gev, ok := event.(events.GRPCEvent); ok {
+ switch gev.Event {
+ case events.EventUnaryCallOk:
+ p.log.Info("method called", "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed)
+ case events.EventUnaryCallErr:
+ p.log.Info("method call finished with error", "error", gev.Error, "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed)
+ }
+ }
+}
diff --git a/plugins/grpc/proxy/proxy.go b/plugins/grpc/proxy/proxy.go
index 9e406bbf..074aac85 100644
--- a/plugins/grpc/proxy/proxy.go
+++ b/plugins/grpc/proxy/proxy.go
@@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"strings"
+ "sync"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
@@ -19,6 +20,12 @@ import (
"google.golang.org/protobuf/types/known/anypb"
)
+const (
+ peerAddr string = ":peer.address"
+ peerAuthType string = ":peer.auth-type"
+ delimiter string = "|:|"
+)
+
// base interface for Proxy class
type proxyService interface {
// RegisterMethod registers new RPC method.
@@ -37,6 +44,7 @@ type rpcContext struct {
// Proxy manages GRPC/RoadRunner bridge.
type Proxy struct {
+ mu *sync.RWMutex
grpcPool pool.Pool
name string
metadata string
@@ -44,8 +52,9 @@ type Proxy struct {
}
// NewProxy creates new service proxy object.
-func NewProxy(name string, metadata string, grpcPool pool.Pool) *Proxy {
+func NewProxy(name string, metadata string, grpcPool pool.Pool, mu *sync.RWMutex) *Proxy {
return &Proxy{
+ mu: mu,
grpcPool: grpcPool,
name: name,
metadata: metadata,
@@ -110,7 +119,9 @@ func (p *Proxy) invoke(ctx context.Context, method string, in codec.RawMessage)
return nil, err
}
+ p.mu.RLock()
resp, err := p.grpcPool.Exec(payload)
+ p.mu.RUnlock()
if err != nil {
return nil, wrapError(err)
@@ -160,9 +171,9 @@ func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMe
}
if pr, ok := peer.FromContext(ctx); ok {
- ctxMD[":peer.address"] = []string{pr.Addr.String()}
+ ctxMD[peerAddr] = []string{pr.Addr.String()}
if pr.AuthInfo != nil {
- ctxMD[":peer.auth-type"] = []string{pr.AuthInfo.AuthType()}
+ ctxMD[peerAuthType] = []string{pr.AuthInfo.AuthType()}
}
}
@@ -178,8 +189,8 @@ func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMe
// mounts proper error code for the error
func wrapError(err error) error {
// internal agreement
- if strings.Contains(err.Error(), "|:|") {
- chunks := strings.Split(err.Error(), "|:|")
+ if strings.Contains(err.Error(), delimiter) {
+ chunks := strings.Split(err.Error(), delimiter)
code := codes.Internal
// protect the slice access
diff --git a/plugins/grpc/server.go b/plugins/grpc/server.go
index 24759fba..323f73a0 100644
--- a/plugins/grpc/server.go
+++ b/plugins/grpc/server.go
@@ -7,8 +7,10 @@ import (
"fmt"
"os"
"path"
+ "time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/plugins/grpc/parser"
"github.com/spiral/roadrunner/v2/plugins/grpc/proxy"
"google.golang.org/grpc"
@@ -33,7 +35,7 @@ func (p *Plugin) createGRPCserver() (*grpc.Server, error) {
}
for _, service := range services {
- p := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto, p.gPool)
+ p := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto, p.gPool, p.mu)
for _, m := range service.Methods {
p.RegisterMethod(m.Name)
}
@@ -50,19 +52,29 @@ func (p *Plugin) createGRPCserver() (*grpc.Server, error) {
return server, nil
}
-func (p *Plugin) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- // start := time.Now()
- resp, err = handler(ctx, req)
+func (p *Plugin) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ start := time.Now()
+ resp, err := handler(ctx, req)
+ if err != nil {
+ p.events.Push(events.GRPCEvent{
+ Event: events.EventUnaryCallErr,
+ Info: info,
+ Error: err,
+ Start: start,
+ Elapsed: time.Since(start),
+ })
+
+ return nil, err
+ }
- // svc.throw(EventUnaryCall, &UnaryCallEvent{
- // Info: info,
- // Context: ctx,
- // Error: err,
- // start: start,
- // elapsed: time.Since(start),
- // })
+ p.events.Push(events.GRPCEvent{
+ Event: events.EventUnaryCallOk,
+ Info: info,
+ Start: start,
+ Elapsed: time.Since(start),
+ })
- return resp, err
+ return resp, nil
}
func (p *Plugin) serverOptions() ([]grpc.ServerOption, error) {
diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php
index 80fc435c..f8b1b47c 100644
--- a/tests/psr-worker-bench.php
+++ b/tests/psr-worker-bench.php
@@ -18,6 +18,7 @@ $psr7 = new RoadRunner\Http\PSR7Worker(
while ($req = $psr7->waitRequest()) {
try {
+ sleep(3);
$resp = new \Nyholm\Psr7\Response();
$resp->getBody()->write("hello world");