diff options
Diffstat (limited to 'plugins/grpc')
21 files changed, 1686 insertions, 0 deletions
diff --git a/plugins/grpc/codec/codec.go b/plugins/grpc/codec/codec.go new file mode 100644 index 00000000..a9d89ac5 --- /dev/null +++ b/plugins/grpc/codec/codec.go @@ -0,0 +1,44 @@ +package codec + +import "google.golang.org/grpc/encoding" + +type RawMessage []byte + +// By default, gRPC registers and uses the "proto" codec, so it is not necessary to do this in your own code to send and receive proto messages. +// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec +const cName string = "proto" +const rm string = "rawMessage" + +func (r RawMessage) Reset() {} +func (RawMessage) ProtoMessage() {} +func (RawMessage) String() string { return rm } + +type Codec struct{ base encoding.Codec } + +// Marshal returns the wire format of v. rawMessages would be returned without encoding. +func (c *Codec) Marshal(v interface{}) ([]byte, error) { + if raw, ok := v.(RawMessage); ok { + return raw, nil + } + + return c.base.Marshal(v) +} + +// Unmarshal parses the wire format into v. rawMessages would not be unmarshalled. +func (c *Codec) Unmarshal(data []byte, v interface{}) error { + if raw, ok := v.(*RawMessage); ok { + *raw = data + return nil + } + + return c.base.Unmarshal(data, v) +} + +func (c *Codec) Name() string { + return cName +} + +// String return codec name. +func (c *Codec) String() string { + return "raw:" + c.base.Name() +} diff --git a/plugins/grpc/codec/codec_test.go b/plugins/grpc/codec/codec_test.go new file mode 100644 index 00000000..60efb072 --- /dev/null +++ b/plugins/grpc/codec/codec_test.go @@ -0,0 +1,79 @@ +package codec + +import ( + "testing" + + json "github.com/json-iterator/go" + "github.com/stretchr/testify/assert" +) + +type jsonCodec struct{} + +func (jsonCodec) Marshal(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (jsonCodec) Unmarshal(data []byte, v interface{}) error { + return json.Unmarshal(data, v) +} + +func (jsonCodec) Name() string { + return "json" +} + +func TestCodec_String(t *testing.T) { + c := Codec{jsonCodec{}} + + assert.Equal(t, "raw:json", c.String()) + + r := RawMessage{} + r.Reset() + r.ProtoMessage() + assert.Equal(t, "rawMessage", r.String()) +} + +func TestCodec_Unmarshal_ByPass(t *testing.T) { + c := Codec{jsonCodec{}} + + s := struct { + Name string + }{} + + assert.NoError(t, c.Unmarshal([]byte(`{"name":"name"}`), &s)) + assert.Equal(t, "name", s.Name) +} + +func TestCodec_Marshal_ByPass(t *testing.T) { + c := Codec{jsonCodec{}} + + s := struct { + Name string + }{ + Name: "name", + } + + d, err := c.Marshal(s) + assert.NoError(t, err) + + assert.Equal(t, `{"Name":"name"}`, string(d)) +} + +func TestCodec_Unmarshal_Raw(t *testing.T) { + c := Codec{jsonCodec{}} + + s := RawMessage{} + + assert.NoError(t, c.Unmarshal([]byte(`{"name":"name"}`), &s)) + assert.Equal(t, `{"name":"name"}`, string(s)) +} + +func TestCodec_Marshal_Raw(t *testing.T) { + c := Codec{jsonCodec{}} + + s := RawMessage(`{"Name":"name"}`) + + d, err := c.Marshal(s) + assert.NoError(t, err) + + assert.Equal(t, `{"Name":"name"}`, string(d)) +} diff --git a/plugins/grpc/config.go b/plugins/grpc/config.go new file mode 100644 index 00000000..fedd4998 --- /dev/null +++ b/plugins/grpc/config.go @@ -0,0 +1,128 @@ +package grpc + +import ( + "math" + "os" + "strings" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pool" +) + +type Config struct { + Listen string `mapstructure:"listen"` + Proto string `mapstructure:"proto"` + + TLS *TLS + + // Env is environment variables passed to the http pool + Env map[string]string `mapstructure:"env"` + + GrpcPool *pool.Config `mapstructure:"pool"` + MaxSendMsgSize int64 `mapstructure:"max_send_msg_size"` + MaxRecvMsgSize int64 `mapstructure:"max_recv_msg_size"` + MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle"` + MaxConnectionAge time.Duration `mapstructure:"max_connection_age"` + MaxConnectionAgeGrace time.Duration `mapstructure:"max_connection_age_grace"` + MaxConcurrentStreams int64 `mapstructure:"max_concurrent_streams"` + PingTime time.Duration `mapstructure:"ping_time"` + Timeout time.Duration `mapstructure:"timeout"` +} + +type TLS struct { + Key string + Cert string + RootCA string +} + +func (c *Config) InitDefaults() error { //nolint:gocognit + const op = errors.Op("grpc_plugin_config") + if c.GrpcPool == nil { + c.GrpcPool = &pool.Config{} + } + + c.GrpcPool.InitDefaults() + + if !strings.Contains(c.Listen, ":") { + return errors.E(op, errors.Errorf("mailformed grpc grpc address, provided: %s", c.Listen)) + } + + if c.EnableTLS() { + if _, err := os.Stat(c.TLS.Key); err != nil { + if os.IsNotExist(err) { + return errors.E(op, errors.Errorf("key file '%s' does not exists", c.TLS.Key)) + } + + return errors.E(op, err) + } + + if _, err := os.Stat(c.TLS.Cert); err != nil { + if os.IsNotExist(err) { + return errors.E(op, errors.Errorf("cert file '%s' does not exists", c.TLS.Cert)) + } + + return errors.E(op, err) + } + + // RootCA is optional, but if provided - check it + if c.TLS.RootCA != "" { + if _, err := os.Stat(c.TLS.RootCA); err != nil { + if os.IsNotExist(err) { + return errors.E(op, errors.Errorf("root ca path provided, but key file '%s' does not exists", c.TLS.RootCA)) + } + return errors.E(op, err) + } + } + } + + // used to set max time + infinity := time.Duration(math.MaxInt64) + + if c.PingTime == 0 { + c.PingTime = time.Hour * 2 + } + + if c.Timeout == 0 { + c.Timeout = time.Second * 20 + } + + if c.MaxConcurrentStreams == 0 { + c.MaxConcurrentStreams = 10 + } + // set default + if c.MaxConnectionAge == 0 { + c.MaxConnectionAge = infinity + } + + // set default + if c.MaxConnectionIdle == 0 { + c.MaxConnectionIdle = infinity + } + + if c.MaxConnectionAgeGrace == 0 { + c.MaxConnectionAgeGrace = infinity + } + + if c.MaxRecvMsgSize == 0 { + c.MaxRecvMsgSize = 1024 * 1024 * 50 + } else { + c.MaxRecvMsgSize = 1024 * 1024 * c.MaxRecvMsgSize + } + + if c.MaxSendMsgSize == 0 { + c.MaxSendMsgSize = 1024 * 1024 * 50 + } else { + c.MaxSendMsgSize = 1024 * 1024 * c.MaxSendMsgSize + } + + return nil +} + +func (c *Config) EnableTLS() bool { + if c.TLS != nil { + return (c.TLS.RootCA != "" && c.TLS.Key != "" && c.TLS.Cert != "") || (c.TLS.Key != "" && c.TLS.Cert != "") + } + + return false +} diff --git a/plugins/grpc/parser/message.proto b/plugins/grpc/parser/message.proto new file mode 100644 index 00000000..a4012010 --- /dev/null +++ b/plugins/grpc/parser/message.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package app.namespace; + +message Message { + string msg = 1; + int64 value = 2; +}
\ No newline at end of file diff --git a/plugins/grpc/parser/parse.go b/plugins/grpc/parser/parse.go new file mode 100644 index 00000000..d59b0927 --- /dev/null +++ b/plugins/grpc/parser/parse.go @@ -0,0 +1,114 @@ +package parser + +import ( + "bytes" + "io" + "os" + + pp "github.com/emicklei/proto" +) + +// Service contains information about singular GRPC service. +type Service struct { + // Package defines service namespace. + Package string + + // Name defines service name. + Name string + + // Methods list. + Methods []Method +} + +// Method describes singular RPC method. +type Method struct { + // Name is method name. + Name string + + // StreamsRequest defines if method accept stream input. + StreamsRequest bool + + // RequestType defines message name (from the same package) of method input. + RequestType string + + // StreamsReturns defines if method streams result. + StreamsReturns bool + + // ReturnsType defines message name (from the same package) of method return value. + ReturnsType string +} + +// File parses given proto file or returns error. +func File(file string, importPath string) ([]Service, error) { + reader, _ := os.Open(file) + defer reader.Close() + + return parse(reader, importPath) +} + +// Bytes parses string into proto definition. +func Bytes(data []byte) ([]Service, error) { + return parse(bytes.NewBuffer(data), "") +} + +func parse(reader io.Reader, importPath string) ([]Service, error) { + proto, err := pp.NewParser(reader).Parse() + if err != nil { + return nil, err + } + + return parseServices( + proto, + parsePackage(proto), + importPath, + ) +} + +func parsePackage(proto *pp.Proto) string { + for _, e := range proto.Elements { + if p, ok := e.(*pp.Package); ok { + return p.Name + } + } + + return "" +} + +func parseServices(proto *pp.Proto, pkg string, importPath string) ([]Service, error) { + services := make([]Service, 0) + + pp.Walk(proto, pp.WithService(func(service *pp.Service) { + services = append(services, Service{ + Package: pkg, + Name: service.Name, + Methods: parseMethods(service), + }) + })) + + pp.Walk(proto, func(v pp.Visitee) { + if i, ok := v.(*pp.Import); ok { + if im, err := File(importPath+"/"+i.Filename, importPath); err == nil { + services = append(services, im...) + } + } + }) + + return services, nil +} + +func parseMethods(s *pp.Service) []Method { + methods := make([]Method, 0) + for _, e := range s.Elements { + if m, ok := e.(*pp.RPC); ok { + methods = append(methods, Method{ + Name: m.Name, + StreamsRequest: m.StreamsRequest, + RequestType: m.RequestType, + StreamsReturns: m.StreamsReturns, + ReturnsType: m.ReturnsType, + }) + } + } + + return methods +} diff --git a/plugins/grpc/parser/parse_test.go b/plugins/grpc/parser/parse_test.go new file mode 100644 index 00000000..b71c133d --- /dev/null +++ b/plugins/grpc/parser/parse_test.go @@ -0,0 +1,71 @@ +package parser + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseFile(t *testing.T) { + services, err := File("test.proto", "") + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} + +func TestParseFileWithImportsNestedFolder(t *testing.T) { + services, err := File("./test_nested/test_import.proto", "./test_nested") + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} + +func TestParseFileWithImports(t *testing.T) { + services, err := File("test_import.proto", ".") + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} + +func TestParseNotFound(t *testing.T) { + _, err := File("test2.proto", "") + assert.Error(t, err) +} + +func TestParseBytes(t *testing.T) { + services, err := Bytes([]byte{}) + assert.NoError(t, err) + assert.Len(t, services, 0) +} + +func TestParseString(t *testing.T) { + services, err := Bytes([]byte(` +syntax = "proto3"; +package app.namespace; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +} + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +} + +message Message { + string msg = 1; + int64 value = 2; +} +`)) + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} diff --git a/plugins/grpc/parser/pong.proto b/plugins/grpc/parser/pong.proto new file mode 100644 index 00000000..9756fabe --- /dev/null +++ b/plugins/grpc/parser/pong.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test.proto b/plugins/grpc/parser/test.proto new file mode 100644 index 00000000..e2230954 --- /dev/null +++ b/plugins/grpc/parser/test.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; +package app.namespace; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +} + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +} + +message Message { + string msg = 1; + int64 value = 2; +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_import.proto b/plugins/grpc/parser/test_import.proto new file mode 100644 index 00000000..1b954fc1 --- /dev/null +++ b/plugins/grpc/parser/test_import.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; +import "pong.proto"; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/message.proto b/plugins/grpc/parser/test_nested/message.proto new file mode 100644 index 00000000..a4012010 --- /dev/null +++ b/plugins/grpc/parser/test_nested/message.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package app.namespace; + +message Message { + string msg = 1; + int64 value = 2; +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/pong.proto b/plugins/grpc/parser/test_nested/pong.proto new file mode 100644 index 00000000..9756fabe --- /dev/null +++ b/plugins/grpc/parser/test_nested/pong.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/test_import.proto b/plugins/grpc/parser/test_nested/test_import.proto new file mode 100644 index 00000000..a3a476ba --- /dev/null +++ b/plugins/grpc/parser/test_nested/test_import.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; +import "pong.proto"; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +} diff --git a/plugins/grpc/plugin.go b/plugins/grpc/plugin.go new file mode 100644 index 00000000..7518d352 --- /dev/null +++ b/plugins/grpc/plugin.go @@ -0,0 +1,195 @@ +package grpc + +import ( + "context" + "sync" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/state/process" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/grpc/codec" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding" +) + +const ( + name string = "grpc" + RrGrpc string = "RR_GRPC" +) + +type Plugin struct { + mu *sync.RWMutex + config *Config + gPool pool.Pool + opts []grpc.ServerOption + services []func(server *grpc.Server) + server *grpc.Server + rrServer server.Server + + // events handler + events events.Handler + log logger.Logger +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { + const op = errors.Op("grpc_plugin_init") + + if !cfg.Has(name) { + return errors.E(errors.Disabled) + } + // register the codec + encoding.RegisterCodec(&codec.Codec{}) + + err := cfg.UnmarshalKey(name, &p.config) + if err != nil { + return errors.E(op, err) + } + + err = p.config.InitDefaults() + if err != nil { + return errors.E(op, err) + } + + p.opts = make([]grpc.ServerOption, 0) + p.services = make([]func(server *grpc.Server), 0) + p.events = events.NewEventsHandler() + p.events.AddListener(p.collectGRPCEvents) + p.rrServer = server + + // worker's GRPC mode + if p.config.Env == nil { + p.config.Env = make(map[string]string) + } + p.config.Env[RrGrpc] = "true" + + p.log = log + p.mu = &sync.RWMutex{} + + return nil +} + +func (p *Plugin) Serve() chan error { + const op = errors.Op("grpc_plugin_serve") + errCh := make(chan error, 1) + + var err error + p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{ + Debug: p.config.GrpcPool.Debug, + NumWorkers: p.config.GrpcPool.NumWorkers, + MaxJobs: p.config.GrpcPool.MaxJobs, + AllocateTimeout: p.config.GrpcPool.AllocateTimeout, + DestroyTimeout: p.config.GrpcPool.DestroyTimeout, + Supervisor: p.config.GrpcPool.Supervisor, + }, p.config.Env, p.collectGRPCEvents) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + go func() { + var err error + p.mu.Lock() + p.server, err = p.createGRPCserver() + if err != nil { + p.log.Error("create grpc server", "error", err) + errCh <- errors.E(op, err) + return + } + + l, err := utils.CreateListener(p.config.Listen) + if err != nil { + p.log.Error("create grpc listener", "error", err) + errCh <- errors.E(op, err) + } + + // protect serve + p.mu.Unlock() + err = p.server.Serve(l) + if err != nil { + // skip errors when stopping the server + if err == grpc.ErrServerStopped { + return + } + + p.log.Error("grpc server stopped", "error", err) + errCh <- errors.E(op, err) + return + } + }() + + return errCh +} + +func (p *Plugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.server != nil { + p.server.Stop() + } + return nil +} + +func (p *Plugin) Available() {} + +func (p *Plugin) Name() string { + return name +} + +func (p *Plugin) Reset() error { + p.mu.Lock() + defer p.mu.Unlock() + const op = errors.Op("grpc_plugin_reset") + + // destroy old pool + p.gPool.Destroy(context.Background()) + + var err error + p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{ + Debug: p.config.GrpcPool.Debug, + NumWorkers: p.config.GrpcPool.NumWorkers, + MaxJobs: p.config.GrpcPool.MaxJobs, + AllocateTimeout: p.config.GrpcPool.AllocateTimeout, + DestroyTimeout: p.config.GrpcPool.DestroyTimeout, + Supervisor: p.config.GrpcPool.Supervisor, + }, p.config.Env, p.collectGRPCEvents) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (p *Plugin) Workers() []*process.State { + p.mu.RLock() + defer p.mu.RUnlock() + + workers := p.gPool.Workers() + + ps := make([]*process.State, 0, len(workers)) + for i := 0; i < len(workers); i++ { + state, err := process.WorkerProcessState(workers[i]) + if err != nil { + return nil + } + ps = append(ps, state) + } + + return ps +} + +func (p *Plugin) collectGRPCEvents(event interface{}) { + if gev, ok := event.(events.GRPCEvent); ok { + switch gev.Event { + case events.EventUnaryCallOk: + p.log.Info("method called", "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed) + case events.EventUnaryCallErr: + p.log.Info("method call finished with error", "error", gev.Error, "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed) + } + } +} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go new file mode 100644 index 00000000..0894a7a8 --- /dev/null +++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go @@ -0,0 +1,68 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package main + +import ( + "io" + "io/ioutil" + "os" + + "github.com/spiral/roadrunner/v2/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php" + "google.golang.org/protobuf/proto" + plugin "google.golang.org/protobuf/types/pluginpb" +) + +func main() { + req, err := readRequest(os.Stdin) + if err != nil { + panic(err) + } + + if err = writeResponse(os.Stdout, php.Generate(req)); err != nil { + panic(err) + } +} + +func readRequest(in io.Reader) (*plugin.CodeGeneratorRequest, error) { + data, err := ioutil.ReadAll(in) + if err != nil { + return nil, err + } + + req := new(plugin.CodeGeneratorRequest) + if err = proto.Unmarshal(data, req); err != nil { + return nil, err + } + + return req, nil +} + +func writeResponse(out io.Writer, resp *plugin.CodeGeneratorResponse) error { + data, err := proto.Marshal(resp) + if err != nil { + return err + } + + _, err = out.Write(data) + return err +} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go new file mode 100644 index 00000000..03c48ac8 --- /dev/null +++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go @@ -0,0 +1,57 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package php + +import ( + desc "google.golang.org/protobuf/types/descriptorpb" + plugin "google.golang.org/protobuf/types/pluginpb" +) + +// Generate generates needed service classes +func Generate(req *plugin.CodeGeneratorRequest) *plugin.CodeGeneratorResponse { + resp := &plugin.CodeGeneratorResponse{} + + for _, file := range req.ProtoFile { + for _, service := range file.Service { + resp.File = append(resp.File, generate(req, file, service)) + } + } + + return resp +} + +func generate( + req *plugin.CodeGeneratorRequest, + file *desc.FileDescriptorProto, + service *desc.ServiceDescriptorProto, +) *plugin.CodeGeneratorResponse_File { + return &plugin.CodeGeneratorResponse_File{ + Name: str(filename(file, service.Name)), + Content: str(body(req, file, service)), + } +} + +// helper to convert string into string pointer +func str(str string) *string { + return &str +} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go new file mode 100644 index 00000000..32579e33 --- /dev/null +++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go @@ -0,0 +1,139 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package php + +import ( + "bytes" + "strings" + "unicode" +) + +// @see https://github.com/protocolbuffers/protobuf/blob/master/php/ext/google/protobuf/protobuf.c#L168 +var reservedKeywords = []string{ + "abstract", "and", "array", "as", "break", + "callable", "case", "catch", "class", "clone", + "const", "continue", "declare", "default", "die", + "do", "echo", "else", "elseif", "empty", + "enddeclare", "endfor", "endforeach", "endif", "endswitch", + "endwhile", "eval", "exit", "extends", "final", + "for", "foreach", "function", "global", "goto", + "if", "implements", "include", "include_once", "instanceof", + "insteadof", "interface", "isset", "list", "namespace", + "new", "or", "print", "private", "protected", + "public", "require", "require_once", "return", "static", + "switch", "throw", "trait", "try", "unset", + "use", "var", "while", "xor", "int", + "float", "bool", "string", "true", "false", + "null", "void", "iterable", +} + +// Check if given name/keyword is reserved by php. +func isReserved(name string) bool { + name = strings.ToLower(name) + for _, k := range reservedKeywords { + if name == k { + return true + } + } + + return false +} + +// generate php namespace or path +func namespace(pkg *string, sep string) string { + if pkg == nil { + return "" + } + + result := bytes.NewBuffer(nil) + for _, p := range strings.Split(*pkg, ".") { + result.WriteString(identifier(p, "")) + result.WriteString(sep) + } + + return strings.Trim(result.String(), sep) +} + +// create php identifier for class or message +func identifier(name string, suffix string) string { + name = Camelize(name) + if suffix != "" { + return name + Camelize(suffix) + } + + return name +} + +func resolveReserved(identifier string, pkg string) string { + if isReserved(strings.ToLower(identifier)) { + if pkg == ".google.protobuf" { + return "GPB" + identifier + } + return "PB" + identifier + } + + return identifier +} + +// Camelize "dino_party" -> "DinoParty" +func Camelize(word string) string { + words := splitAtCaseChangeWithTitlecase(word) + return strings.Join(words, "") +} + +func splitAtCaseChangeWithTitlecase(s string) []string { + words := make([]string, 0) + word := make([]rune, 0) + for _, c := range s { + spacer := isSpacerChar(c) + if len(word) > 0 { + if unicode.IsUpper(c) || spacer { + words = append(words, string(word)) + word = make([]rune, 0) + } + } + if !spacer { + if len(word) > 0 { + word = append(word, unicode.ToLower(c)) + } else { + word = append(word, unicode.ToUpper(c)) + } + } + } + words = append(words, string(word)) + return words +} + +func isSpacerChar(c rune) bool { + switch { + case c == rune("_"[0]): + return true + case c == rune(" "[0]): + return true + case c == rune(":"[0]): + return true + case c == rune("-"[0]): + return true + } + return false +} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go new file mode 100644 index 00000000..c1dc3898 --- /dev/null +++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go @@ -0,0 +1,103 @@ +package php + +import ( + "bytes" + "fmt" + "strings" + + desc "google.golang.org/protobuf/types/descriptorpb" + plugin "google.golang.org/protobuf/types/pluginpb" +) + +// manages internal name representation of the package +type ns struct { + // Package defines file package. + Package string + + // Root namespace of the package + Namespace string + + // Import declares what namespaces to be imported + Import map[string]string +} + +// newNamespace creates new work namespace. +func newNamespace(req *plugin.CodeGeneratorRequest, file *desc.FileDescriptorProto, service *desc.ServiceDescriptorProto) *ns { + ns := &ns{ + Package: *file.Package, + Namespace: namespace(file.Package, "\\"), + Import: make(map[string]string), + } + + if file.Options != nil && file.Options.PhpNamespace != nil { + ns.Namespace = *file.Options.PhpNamespace + } + + for k := range service.Method { + ns.importMessage(req, service.Method[k].InputType) + ns.importMessage(req, service.Method[k].OutputType) + } + + return ns +} + +// importMessage registers new import message namespace (only the namespace). +func (ns *ns) importMessage(req *plugin.CodeGeneratorRequest, msg *string) { + if msg == nil { + return + } + + chunks := strings.Split(*msg, ".") + pkg := strings.Join(chunks[:len(chunks)-1], ".") + + result := bytes.NewBuffer(nil) + for _, p := range chunks[:len(chunks)-1] { + result.WriteString(identifier(p, "")) + result.WriteString(`\`) + } + + if pkg == "."+ns.Package { + // root package + return + } + + for _, f := range req.ProtoFile { + if pkg == "."+*f.Package { + if f.Options != nil && f.Options.PhpNamespace != nil { + // custom imported namespace + ns.Import[pkg] = *f.Options.PhpNamespace + return + } + } + } + + ns.Import[pkg] = strings.Trim(result.String(), `\`) +} + +// resolve message alias +func (ns *ns) resolve(msg *string) string { + chunks := strings.Split(*msg, ".") + pkg := strings.Join(chunks[:len(chunks)-1], ".") + + if pkg == "."+ns.Package { + // root message + return identifier(chunks[len(chunks)-1], "") + } + + for iPkg, ns := range ns.Import { + if pkg == iPkg { + // use last namespace chunk + nsChunks := strings.Split(ns, `\`) + identifier := identifier(chunks[len(chunks)-1], "") + + return fmt.Sprintf( + `%s\%s`, + nsChunks[len(nsChunks)-1], + resolveReserved(identifier, pkg), + ) + } + } + + // fully clarified name (fallback) + return "\\" + namespace(msg, "\\") +} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go new file mode 100644 index 00000000..e00c6fdd --- /dev/null +++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go @@ -0,0 +1,103 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package php + +import ( + "bytes" + "fmt" + "strings" + "text/template" + + desc "google.golang.org/protobuf/types/descriptorpb" + plugin "google.golang.org/protobuf/types/pluginpb" +) + +const phpBody = `<?php +# Generated by the protocol buffer compiler (spiral/php-grpc). DO NOT EDIT! +# source: {{ .File.Name }} +{{ $ns := .Namespace -}} +{{if $ns.Namespace}} +namespace {{ $ns.Namespace }}; +{{end}} +use Spiral\GRPC; +{{- range $n := $ns.Import}} +use {{ $n }}; +{{- end}} + +interface {{ .Service.Name | interface }} extends GRPC\ServiceInterface +{ + // GRPC specific service name. + public const NAME = "{{ .File.Package }}.{{ .Service.Name }}";{{ "\n" }} +{{- range $m := .Service.Method}} + /** + * @param GRPC\ContextInterface $ctx + * @param {{ name $ns $m.InputType }} $in + * @return {{ name $ns $m.OutputType }} + * + * @throws GRPC\Exception\InvokeException + */ + public function {{ $m.Name }}(GRPC\ContextInterface $ctx, {{ name $ns $m.InputType }} $in): {{ name $ns $m.OutputType }}; +{{end -}} +} +` + +// generate php filename +func filename(file *desc.FileDescriptorProto, name *string) string { + ns := namespace(file.Package, "/") + if file.Options != nil && file.Options.PhpNamespace != nil { + ns = strings.ReplaceAll(*file.Options.PhpNamespace, `\`, `/`) + } + + return fmt.Sprintf("%s/%s.php", ns, identifier(*name, "interface")) +} + +// generate php file body +func body(req *plugin.CodeGeneratorRequest, file *desc.FileDescriptorProto, service *desc.ServiceDescriptorProto) string { + out := bytes.NewBuffer(nil) + + data := struct { + Namespace *ns + File *desc.FileDescriptorProto + Service *desc.ServiceDescriptorProto + }{ + Namespace: newNamespace(req, file, service), + File: file, + Service: service, + } + + tpl := template.Must(template.New("phpBody").Funcs(template.FuncMap{ + "interface": func(name *string) string { + return identifier(*name, "interface") + }, + "name": func(ns *ns, name *string) string { + return ns.resolve(name) + }, + }).Parse(phpBody)) + + err := tpl.Execute(out, data) + if err != nil { + panic(err) + } + + return out.String() +} diff --git a/plugins/grpc/proxy/proxy.go b/plugins/grpc/proxy/proxy.go new file mode 100644 index 00000000..074aac85 --- /dev/null +++ b/plugins/grpc/proxy/proxy.go @@ -0,0 +1,219 @@ +package proxy + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/grpc/codec" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" +) + +const ( + peerAddr string = ":peer.address" + peerAuthType string = ":peer.auth-type" + delimiter string = "|:|" +) + +// base interface for Proxy class +type proxyService interface { + // RegisterMethod registers new RPC method. + RegisterMethod(method string) + + // ServiceDesc returns service description for the proxy. + ServiceDesc() *grpc.ServiceDesc +} + +// carry details about service, method and RPC context to PHP process +type rpcContext struct { + Service string `json:"service"` + Method string `json:"method"` + Context map[string][]string `json:"context"` +} + +// Proxy manages GRPC/RoadRunner bridge. +type Proxy struct { + mu *sync.RWMutex + grpcPool pool.Pool + name string + metadata string + methods []string +} + +// NewProxy creates new service proxy object. +func NewProxy(name string, metadata string, grpcPool pool.Pool, mu *sync.RWMutex) *Proxy { + return &Proxy{ + mu: mu, + grpcPool: grpcPool, + name: name, + metadata: metadata, + methods: make([]string, 0), + } +} + +// RegisterMethod registers new RPC method. +func (p *Proxy) RegisterMethod(method string) { + p.methods = append(p.methods, method) +} + +// ServiceDesc returns service description for the proxy. +func (p *Proxy) ServiceDesc() *grpc.ServiceDesc { + desc := &grpc.ServiceDesc{ + ServiceName: p.name, + Metadata: p.metadata, + HandlerType: (*proxyService)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{}, + } + + // Registering methods + for _, m := range p.methods { + desc.Methods = append(desc.Methods, grpc.MethodDesc{ + MethodName: m, + Handler: p.methodHandler(m), + }) + } + + return desc +} + +// Generate method handler proxy. +func (p *Proxy) methodHandler(method string) func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + return func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := codec.RawMessage{} + if err := dec(&in); err != nil { + return nil, wrapError(err) + } + + if interceptor == nil { + return p.invoke(ctx, method, in) + } + + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: fmt.Sprintf("/%s/%s", p.name, method), + } + + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return p.invoke(ctx, method, req.(codec.RawMessage)) + } + + return interceptor(ctx, in, info, handler) + } +} + +func (p *Proxy) invoke(ctx context.Context, method string, in codec.RawMessage) (interface{}, error) { + payload, err := p.makePayload(ctx, method, in) + if err != nil { + return nil, err + } + + p.mu.RLock() + resp, err := p.grpcPool.Exec(payload) + p.mu.RUnlock() + + if err != nil { + return nil, wrapError(err) + } + + md, err := p.responseMetadata(resp) + if err != nil { + return nil, err + } + ctx = metadata.NewIncomingContext(ctx, md) + err = grpc.SetHeader(ctx, md) + if err != nil { + return nil, err + } + + return codec.RawMessage(resp.Body), nil +} + +// responseMetadata extracts metadata from roadrunner response Payload.Context and converts it to metadata.MD +func (p *Proxy) responseMetadata(resp *payload.Payload) (metadata.MD, error) { + var md metadata.MD + if resp == nil || len(resp.Context) == 0 { + return md, nil + } + + var rpcMetadata map[string]string + err := json.Unmarshal(resp.Context, &rpcMetadata) + if err != nil { + return md, err + } + + if len(rpcMetadata) > 0 { + md = metadata.New(rpcMetadata) + } + + return md, nil +} + +// makePayload generates RoadRunner compatible payload based on GRPC message. todo: return error +func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMessage) (*payload.Payload, error) { + ctxMD := make(map[string][]string) + + if md, ok := metadata.FromIncomingContext(ctx); ok { + for k, v := range md { + ctxMD[k] = v + } + } + + if pr, ok := peer.FromContext(ctx); ok { + ctxMD[peerAddr] = []string{pr.Addr.String()} + if pr.AuthInfo != nil { + ctxMD[peerAuthType] = []string{pr.AuthInfo.AuthType()} + } + } + + ctxData, err := json.Marshal(rpcContext{Service: p.name, Method: method, Context: ctxMD}) + + if err != nil { + return nil, err + } + + return &payload.Payload{Context: ctxData, Body: body}, nil +} + +// mounts proper error code for the error +func wrapError(err error) error { + // internal agreement + if strings.Contains(err.Error(), delimiter) { + chunks := strings.Split(err.Error(), delimiter) + code := codes.Internal + + // protect the slice access + if len(chunks) < 2 { + return err + } + + if phpCode, errConv := strconv.ParseUint(chunks[0], 10, 32); errConv == nil { + code = codes.Code(phpCode) + } + + st := status.New(code, chunks[1]).Proto() + + for _, detailsMessage := range chunks[2:] { + anyDetailsMessage := anypb.Any{} + errP := proto.Unmarshal([]byte(detailsMessage), &anyDetailsMessage) + if errP == nil { + st.Details = append(st.Details, &anyDetailsMessage) + } + } + + return status.ErrorProto(st) + } + + return status.Error(codes.Internal, err.Error()) +} diff --git a/plugins/grpc/proxy/proxy_test.go b/plugins/grpc/proxy/proxy_test.go new file mode 100644 index 00000000..2c024ee3 --- /dev/null +++ b/plugins/grpc/proxy/proxy_test.go @@ -0,0 +1,134 @@ +package proxy + +// import ( +// "testing" +// "time" + +// "github.com/sirupsen/logrus" +// "github.com/sirupsen/logrus/hooks/test" +// "github.com/stretchr/testify/assert" +// "golang.org/x/net/context" +// "google.golang.org/grpc" +// "google.golang.org/grpc/codes" +// "google.golang.org/grpc/metadata" +// "google.golang.org/grpc/status" +// ) + +// const addr = "localhost:9080" + +// func Test_Proxy_Error(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) + +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) + +// assert.NoError(t, c.Init(&testCfg{ +// grpcCfg: `{ +// "listen": "tcp://:9080", +// "tls": { +// "key": "tests/server.key", +// "cert": "tests/server.crt" +// }, +// "proto": "tests/test.proto", +// "workers":{ +// "command": "php tests/worker.php", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10, +// "destroyTimeout": 10 +// } +// } +// }`, +// })) + +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) + +// // should do nothing +// s.(*Service).Stop() + +// go func() { assert.NoError(t, c.Serve()) }() +// time.Sleep(time.Millisecond * 100) +// defer c.Stop() + +// cl, cn := getClient(addr) +// defer cn.Close() + +// _, err := cl.Throw(context.Background(), &tests.Message{Msg: "notFound"}) + +// assert.Error(t, err) +// se, _ := status.FromError(err) +// assert.Equal(t, "nothing here", se.Message()) +// assert.Equal(t, codes.NotFound, se.Code()) + +// _, errWithDetails := cl.Throw(context.Background(), &tests.Message{Msg: "withDetails"}) + +// assert.Error(t, errWithDetails) +// statusWithDetails, _ := status.FromError(errWithDetails) +// assert.Equal(t, "main exception message", statusWithDetails.Message()) +// assert.Equal(t, codes.InvalidArgument, statusWithDetails.Code()) + +// details := statusWithDetails.Details() + +// detailsMessageForException := details[0].(*tests.DetailsMessageForException) + +// assert.Equal(t, detailsMessageForException.Code, uint64(1)) +// assert.Equal(t, detailsMessageForException.Message, "details message") +// } + +// func Test_Proxy_Metadata(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) + +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) + +// assert.NoError(t, c.Init(&testCfg{ +// grpcCfg: `{ +// "listen": "tcp://:9080", +// "tls": { +// "key": "tests/server.key", +// "cert": "tests/server.crt" +// }, +// "proto": "tests/test.proto", +// "workers":{ +// "command": "php tests/worker.php", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10, +// "destroyTimeout": 10 +// } +// } +// }`, +// })) + +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) + +// // should do nothing +// s.(*Service).Stop() + +// go func() { assert.NoError(t, c.Serve()) }() +// time.Sleep(time.Millisecond * 100) +// defer c.Stop() + +// cl, cn := getClient(addr) +// defer cn.Close() + +// ctx := metadata.AppendToOutgoingContext(context.Background(), "key", "proxy-value") +// var header metadata.MD +// out, err := cl.Info( +// ctx, +// &tests.Message{Msg: "MD"}, +// grpc.Header(&header), +// grpc.WaitForReady(true), +// ) +// assert.Equal(t, []string{"bar"}, header.Get("foo")) +// assert.NoError(t, err) +// assert.Equal(t, `["proxy-value"]`, out.Msg) +// } diff --git a/plugins/grpc/server.go b/plugins/grpc/server.go new file mode 100644 index 00000000..323f73a0 --- /dev/null +++ b/plugins/grpc/server.go @@ -0,0 +1,154 @@ +package grpc + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "path" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/grpc/parser" + "github.com/spiral/roadrunner/v2/plugins/grpc/proxy" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" +) + +func (p *Plugin) createGRPCserver() (*grpc.Server, error) { + const op = errors.Op("grpc_plugin_create_server") + opts, err := p.serverOptions() + if err != nil { + return nil, errors.E(op, err) + } + + server := grpc.NewServer(opts...) + + if p.config.Proto != "" { + // php proxy services + services, err := parser.File(p.config.Proto, path.Dir(p.config.Proto)) + if err != nil { + return nil, err + } + + for _, service := range services { + p := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto, p.gPool, p.mu) + for _, m := range service.Methods { + p.RegisterMethod(m.Name) + } + + server.RegisterService(p.ServiceDesc(), p) + } + } + + // external and native services + for _, r := range p.services { + r(server) + } + + return server, nil +} + +func (p *Plugin) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + start := time.Now() + resp, err := handler(ctx, req) + if err != nil { + p.events.Push(events.GRPCEvent{ + Event: events.EventUnaryCallErr, + Info: info, + Error: err, + Start: start, + Elapsed: time.Since(start), + }) + + return nil, err + } + + p.events.Push(events.GRPCEvent{ + Event: events.EventUnaryCallOk, + Info: info, + Start: start, + Elapsed: time.Since(start), + }) + + return resp, nil +} + +func (p *Plugin) serverOptions() ([]grpc.ServerOption, error) { + const op = errors.Op("grpc_plugin_server_options") + + var tcreds credentials.TransportCredentials + var opts []grpc.ServerOption + var cert tls.Certificate + var certPool *x509.CertPool + var rca []byte + var err error + + if p.config.EnableTLS() { + // if client CA is not empty we combine it with Cert and Key + if p.config.TLS.RootCA != "" { + cert, err = tls.LoadX509KeyPair(p.config.TLS.Cert, p.config.TLS.Key) + if err != nil { + return nil, err + } + + certPool, err = x509.SystemCertPool() + if err != nil { + return nil, err + } + if certPool == nil { + certPool = x509.NewCertPool() + } + + rca, err = os.ReadFile(p.config.TLS.RootCA) + if err != nil { + return nil, err + } + + if ok := certPool.AppendCertsFromPEM(rca); !ok { + return nil, errors.E(op, errors.Str("could not append Certs from PEM")) + } + + tcreds = credentials.NewTLS(&tls.Config{ + MinVersion: tls.VersionTLS12, + ClientAuth: tls.RequireAndVerifyClientCert, + Certificates: []tls.Certificate{cert}, + ClientCAs: certPool, + }) + } else { + tcreds, err = credentials.NewServerTLSFromFile(p.config.TLS.Cert, p.config.TLS.Key) + if err != nil { + return nil, err + } + } + + serverOptions := []grpc.ServerOption{ + grpc.MaxSendMsgSize(int(p.config.MaxSendMsgSize)), + grpc.MaxRecvMsgSize(int(p.config.MaxRecvMsgSize)), + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: p.config.MaxConnectionIdle, + MaxConnectionAge: p.config.MaxConnectionAge, + MaxConnectionAgeGrace: p.config.MaxConnectionAge, + Time: p.config.PingTime, + Timeout: p.config.Timeout, + }), + grpc.MaxConcurrentStreams(uint32(p.config.MaxConcurrentStreams)), + } + + opts = append(opts, grpc.Creds(tcreds)) + opts = append(opts, serverOptions...) + } + + opts = append(opts, p.opts...) + + // custom codec is required to bypass protobuf, common interceptor used for debug and stats + return append( + opts, + grpc.UnaryInterceptor(p.interceptor), + // TODO(rustatian): check deprecation + // grpc.CustomCodec(&codec{encoding.GetCodec(encCodec)}), + ), nil +} |