diff options
author | Valery Piashchynski <[email protected]> | 2021-09-15 17:13:10 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-15 17:13:10 +0300 |
commit | d445f59b5f9c55719dec16eca96722a913f9e839 (patch) | |
tree | d62a1b7b2f1cacca89e4302bab904cb6fabb416b | |
parent | e4c84c703c2c798e4fa8ff8cf97e5e59e81ef4ed (diff) |
Finish GRPC plugin
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | .vscode/settings.json | 1 | ||||
-rw-r--r-- | pkg/events/grpc_event.go | 39 | ||||
-rw-r--r-- | plugins/grpc/config.go | 5 | ||||
-rw-r--r-- | plugins/grpc/plugin.go | 143 | ||||
-rw-r--r-- | plugins/grpc/proxy/proxy.go | 21 | ||||
-rw-r--r-- | plugins/grpc/server.go | 36 | ||||
-rw-r--r-- | tests/psr-worker-bench.php | 1 |
7 files changed, 224 insertions, 22 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json index 50bc942d..5fba80db 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -19,6 +19,7 @@ "mget", "prefetch", "proto", + "protobuf", "SETEX", "shivammathur", "srem", diff --git a/pkg/events/grpc_event.go b/pkg/events/grpc_event.go new file mode 100644 index 00000000..31ff4957 --- /dev/null +++ b/pkg/events/grpc_event.go @@ -0,0 +1,39 @@ +package events + +import ( + "time" + + "google.golang.org/grpc" +) + +const ( + // EventUnaryCallOk represents success unary call response + EventUnaryCallOk G = iota + 13000 + + // EventUnaryCallErr raised when unary call ended with error + EventUnaryCallErr +) + +type G int64 + +func (ev G) String() string { + switch ev { + case EventUnaryCallOk: + return "EventUnaryCallOk" + case EventUnaryCallErr: + return "EventUnaryCallErr" + } + return UnknownEventType +} + +// JobEvent represent job event. +type GRPCEvent struct { + Event G + // Info contains unary call info. + Info *grpc.UnaryServerInfo + // Error associated with event. + Error error + // event timings + Start time.Time + Elapsed time.Duration +} diff --git a/plugins/grpc/config.go b/plugins/grpc/config.go index 8a3af6a2..87bbf7ae 100644 --- a/plugins/grpc/config.go +++ b/plugins/grpc/config.go @@ -12,7 +12,10 @@ type Config struct { TLS *TLS - grpcPool pool.Config + // Env is environment variables passed to the http pool + Env map[string]string + + GrpcPool pool.Config MaxSendMsgSize int64 `mapstructure:"max_send_msg_size"` MaxRecvMsgSize int64 `mapstructure:"max_recv_msg_size"` MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle"` diff --git a/plugins/grpc/plugin.go b/plugins/grpc/plugin.go index 9285afe5..579a00a4 100644 --- a/plugins/grpc/plugin.go +++ b/plugins/grpc/plugin.go @@ -1,35 +1,66 @@ package grpc import ( + "context" + "sync" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/grpc/codec" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/utils" "google.golang.org/grpc" "google.golang.org/grpc/encoding" ) const ( - name string = "grpc" + name string = "grpc" + RrGrpc string = "RR_GRPC" ) type Plugin struct { + mu *sync.RWMutex config *Config gPool pool.Pool opts []grpc.ServerOption services []func(server *grpc.Server) + server *grpc.Server + rrServer server.Server - cfg config.Configurer - log logger.Logger + // events handler + events events.Handler + log logger.Logger } -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { const op = errors.Op("grpc_plugin_init") + if !cfg.Has(name) { + return errors.E(errors.Disabled) + } // register the codec encoding.RegisterCodec(&codec.Codec{}) + err := cfg.UnmarshalKey(name, p.config) + if err != nil { + return errors.E(op, err) + } + + p.opts = make([]grpc.ServerOption, 0) + p.services = make([]func(server *grpc.Server), 0) + p.events = events.NewEventsHandler() + p.events.AddListener(p.collectGRPCEvents) + p.rrServer = server + + // worker's GRPC mode + p.config.Env[RrGrpc] = "true" + + p.log = log + return nil } @@ -37,10 +68,61 @@ func (p *Plugin) Serve() chan error { const op = errors.Op("grpc_plugin_serve") errCh := make(chan error, 1) + var err error + p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{ + Debug: p.config.GrpcPool.Debug, + NumWorkers: p.config.GrpcPool.NumWorkers, + MaxJobs: p.config.GrpcPool.MaxJobs, + AllocateTimeout: p.config.GrpcPool.AllocateTimeout, + DestroyTimeout: p.config.GrpcPool.DestroyTimeout, + Supervisor: p.config.GrpcPool.Supervisor, + }, p.config.Env, p.collectGRPCEvents) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + go func() { + var err error + p.mu.Lock() + defer p.mu.Unlock() + p.server, err = p.createGRPCserver() + if err != nil { + p.log.Error("create grpc server", "error", err) + errCh <- errors.E(op, err) + return + } + + l, err := utils.CreateListener(p.config.Listen) + if err != nil { + p.log.Error("create grpc listener", "error", err) + errCh <- errors.E(op, err) + } + + // protect serve + err = p.server.Serve(l) + if err != nil { + // skip errors when stopping the server + if err == grpc.ErrServerStopped { + return + } + + p.log.Error("grpc server stopped", "error", err) + errCh <- errors.E(op, err) + return + } + }() + return errCh } func (p *Plugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.server != nil { + p.server.GracefulStop() + } return nil } @@ -49,3 +131,56 @@ func (p *Plugin) Available() {} func (p *Plugin) Name() string { return name } + +func (p *Plugin) Reset() error { + p.mu.Lock() + defer p.mu.Unlock() + const op = errors.Op("grpc_plugin_reset") + + // destroy old pool + p.gPool.Destroy(context.Background()) + + var err error + p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{ + Debug: p.config.GrpcPool.Debug, + NumWorkers: p.config.GrpcPool.NumWorkers, + MaxJobs: p.config.GrpcPool.MaxJobs, + AllocateTimeout: p.config.GrpcPool.AllocateTimeout, + DestroyTimeout: p.config.GrpcPool.DestroyTimeout, + Supervisor: p.config.GrpcPool.Supervisor, + }, p.config.Env, p.collectGRPCEvents) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (p *Plugin) Workers() []*process.State { + p.mu.RLock() + defer p.mu.RUnlock() + + workers := p.gPool.Workers() + + ps := make([]*process.State, 0, len(workers)) + for i := 0; i < len(workers); i++ { + state, err := process.WorkerProcessState(workers[i]) + if err != nil { + return nil + } + ps = append(ps, state) + } + + return ps +} + +func (p *Plugin) collectGRPCEvents(event interface{}) { + if gev, ok := event.(events.GRPCEvent); ok { + switch gev.Event { + case events.EventUnaryCallOk: + p.log.Info("method called", "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed) + case events.EventUnaryCallErr: + p.log.Info("method call finished with error", "error", gev.Error, "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed) + } + } +} diff --git a/plugins/grpc/proxy/proxy.go b/plugins/grpc/proxy/proxy.go index 9e406bbf..074aac85 100644 --- a/plugins/grpc/proxy/proxy.go +++ b/plugins/grpc/proxy/proxy.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" @@ -19,6 +20,12 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) +const ( + peerAddr string = ":peer.address" + peerAuthType string = ":peer.auth-type" + delimiter string = "|:|" +) + // base interface for Proxy class type proxyService interface { // RegisterMethod registers new RPC method. @@ -37,6 +44,7 @@ type rpcContext struct { // Proxy manages GRPC/RoadRunner bridge. type Proxy struct { + mu *sync.RWMutex grpcPool pool.Pool name string metadata string @@ -44,8 +52,9 @@ type Proxy struct { } // NewProxy creates new service proxy object. -func NewProxy(name string, metadata string, grpcPool pool.Pool) *Proxy { +func NewProxy(name string, metadata string, grpcPool pool.Pool, mu *sync.RWMutex) *Proxy { return &Proxy{ + mu: mu, grpcPool: grpcPool, name: name, metadata: metadata, @@ -110,7 +119,9 @@ func (p *Proxy) invoke(ctx context.Context, method string, in codec.RawMessage) return nil, err } + p.mu.RLock() resp, err := p.grpcPool.Exec(payload) + p.mu.RUnlock() if err != nil { return nil, wrapError(err) @@ -160,9 +171,9 @@ func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMe } if pr, ok := peer.FromContext(ctx); ok { - ctxMD[":peer.address"] = []string{pr.Addr.String()} + ctxMD[peerAddr] = []string{pr.Addr.String()} if pr.AuthInfo != nil { - ctxMD[":peer.auth-type"] = []string{pr.AuthInfo.AuthType()} + ctxMD[peerAuthType] = []string{pr.AuthInfo.AuthType()} } } @@ -178,8 +189,8 @@ func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMe // mounts proper error code for the error func wrapError(err error) error { // internal agreement - if strings.Contains(err.Error(), "|:|") { - chunks := strings.Split(err.Error(), "|:|") + if strings.Contains(err.Error(), delimiter) { + chunks := strings.Split(err.Error(), delimiter) code := codes.Internal // protect the slice access diff --git a/plugins/grpc/server.go b/plugins/grpc/server.go index 24759fba..323f73a0 100644 --- a/plugins/grpc/server.go +++ b/plugins/grpc/server.go @@ -7,8 +7,10 @@ import ( "fmt" "os" "path" + "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/plugins/grpc/parser" "github.com/spiral/roadrunner/v2/plugins/grpc/proxy" "google.golang.org/grpc" @@ -33,7 +35,7 @@ func (p *Plugin) createGRPCserver() (*grpc.Server, error) { } for _, service := range services { - p := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto, p.gPool) + p := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto, p.gPool, p.mu) for _, m := range service.Methods { p.RegisterMethod(m.Name) } @@ -50,19 +52,29 @@ func (p *Plugin) createGRPCserver() (*grpc.Server, error) { return server, nil } -func (p *Plugin) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - // start := time.Now() - resp, err = handler(ctx, req) +func (p *Plugin) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + start := time.Now() + resp, err := handler(ctx, req) + if err != nil { + p.events.Push(events.GRPCEvent{ + Event: events.EventUnaryCallErr, + Info: info, + Error: err, + Start: start, + Elapsed: time.Since(start), + }) + + return nil, err + } - // svc.throw(EventUnaryCall, &UnaryCallEvent{ - // Info: info, - // Context: ctx, - // Error: err, - // start: start, - // elapsed: time.Since(start), - // }) + p.events.Push(events.GRPCEvent{ + Event: events.EventUnaryCallOk, + Info: info, + Start: start, + Elapsed: time.Since(start), + }) - return resp, err + return resp, nil } func (p *Plugin) serverOptions() ([]grpc.ServerOption, error) { diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php index 80fc435c..f8b1b47c 100644 --- a/tests/psr-worker-bench.php +++ b/tests/psr-worker-bench.php @@ -18,6 +18,7 @@ $psr7 = new RoadRunner\Http\PSR7Worker( while ($req = $psr7->waitRequest()) { try { + sleep(3); $resp = new \Nyholm\Psr7\Response(); $resp->getBody()->write("hello world"); |