diff options
Diffstat (limited to 'plugins/grpc')
21 files changed, 0 insertions, 1686 deletions
diff --git a/plugins/grpc/codec/codec.go b/plugins/grpc/codec/codec.go deleted file mode 100644 index a9d89ac5..00000000 --- a/plugins/grpc/codec/codec.go +++ /dev/null @@ -1,44 +0,0 @@ -package codec - -import "google.golang.org/grpc/encoding" - -type RawMessage []byte - -// By default, gRPC registers and uses the "proto" codec, so it is not necessary to do this in your own code to send and receive proto messages. -// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec -const cName string = "proto" -const rm string = "rawMessage" - -func (r RawMessage) Reset() {} -func (RawMessage) ProtoMessage() {} -func (RawMessage) String() string { return rm } - -type Codec struct{ base encoding.Codec } - -// Marshal returns the wire format of v. rawMessages would be returned without encoding. -func (c *Codec) Marshal(v interface{}) ([]byte, error) { - if raw, ok := v.(RawMessage); ok { - return raw, nil - } - - return c.base.Marshal(v) -} - -// Unmarshal parses the wire format into v. rawMessages would not be unmarshalled. -func (c *Codec) Unmarshal(data []byte, v interface{}) error { - if raw, ok := v.(*RawMessage); ok { - *raw = data - return nil - } - - return c.base.Unmarshal(data, v) -} - -func (c *Codec) Name() string { - return cName -} - -// String return codec name. -func (c *Codec) String() string { - return "raw:" + c.base.Name() -} diff --git a/plugins/grpc/codec/codec_test.go b/plugins/grpc/codec/codec_test.go deleted file mode 100644 index 60efb072..00000000 --- a/plugins/grpc/codec/codec_test.go +++ /dev/null @@ -1,79 +0,0 @@ -package codec - -import ( - "testing" - - json "github.com/json-iterator/go" - "github.com/stretchr/testify/assert" -) - -type jsonCodec struct{} - -func (jsonCodec) Marshal(v interface{}) ([]byte, error) { - return json.Marshal(v) -} - -func (jsonCodec) Unmarshal(data []byte, v interface{}) error { - return json.Unmarshal(data, v) -} - -func (jsonCodec) Name() string { - return "json" -} - -func TestCodec_String(t *testing.T) { - c := Codec{jsonCodec{}} - - assert.Equal(t, "raw:json", c.String()) - - r := RawMessage{} - r.Reset() - r.ProtoMessage() - assert.Equal(t, "rawMessage", r.String()) -} - -func TestCodec_Unmarshal_ByPass(t *testing.T) { - c := Codec{jsonCodec{}} - - s := struct { - Name string - }{} - - assert.NoError(t, c.Unmarshal([]byte(`{"name":"name"}`), &s)) - assert.Equal(t, "name", s.Name) -} - -func TestCodec_Marshal_ByPass(t *testing.T) { - c := Codec{jsonCodec{}} - - s := struct { - Name string - }{ - Name: "name", - } - - d, err := c.Marshal(s) - assert.NoError(t, err) - - assert.Equal(t, `{"Name":"name"}`, string(d)) -} - -func TestCodec_Unmarshal_Raw(t *testing.T) { - c := Codec{jsonCodec{}} - - s := RawMessage{} - - assert.NoError(t, c.Unmarshal([]byte(`{"name":"name"}`), &s)) - assert.Equal(t, `{"name":"name"}`, string(s)) -} - -func TestCodec_Marshal_Raw(t *testing.T) { - c := Codec{jsonCodec{}} - - s := RawMessage(`{"Name":"name"}`) - - d, err := c.Marshal(s) - assert.NoError(t, err) - - assert.Equal(t, `{"Name":"name"}`, string(d)) -} diff --git a/plugins/grpc/config.go b/plugins/grpc/config.go deleted file mode 100644 index fedd4998..00000000 --- a/plugins/grpc/config.go +++ /dev/null @@ -1,128 +0,0 @@ -package grpc - -import ( - "math" - "os" - "strings" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pool" -) - -type Config struct { - Listen string `mapstructure:"listen"` - Proto string `mapstructure:"proto"` - - TLS *TLS - - // Env is environment variables passed to the http pool - Env map[string]string `mapstructure:"env"` - - GrpcPool *pool.Config `mapstructure:"pool"` - MaxSendMsgSize int64 `mapstructure:"max_send_msg_size"` - MaxRecvMsgSize int64 `mapstructure:"max_recv_msg_size"` - MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle"` - MaxConnectionAge time.Duration `mapstructure:"max_connection_age"` - MaxConnectionAgeGrace time.Duration `mapstructure:"max_connection_age_grace"` - MaxConcurrentStreams int64 `mapstructure:"max_concurrent_streams"` - PingTime time.Duration `mapstructure:"ping_time"` - Timeout time.Duration `mapstructure:"timeout"` -} - -type TLS struct { - Key string - Cert string - RootCA string -} - -func (c *Config) InitDefaults() error { //nolint:gocognit - const op = errors.Op("grpc_plugin_config") - if c.GrpcPool == nil { - c.GrpcPool = &pool.Config{} - } - - c.GrpcPool.InitDefaults() - - if !strings.Contains(c.Listen, ":") { - return errors.E(op, errors.Errorf("mailformed grpc grpc address, provided: %s", c.Listen)) - } - - if c.EnableTLS() { - if _, err := os.Stat(c.TLS.Key); err != nil { - if os.IsNotExist(err) { - return errors.E(op, errors.Errorf("key file '%s' does not exists", c.TLS.Key)) - } - - return errors.E(op, err) - } - - if _, err := os.Stat(c.TLS.Cert); err != nil { - if os.IsNotExist(err) { - return errors.E(op, errors.Errorf("cert file '%s' does not exists", c.TLS.Cert)) - } - - return errors.E(op, err) - } - - // RootCA is optional, but if provided - check it - if c.TLS.RootCA != "" { - if _, err := os.Stat(c.TLS.RootCA); err != nil { - if os.IsNotExist(err) { - return errors.E(op, errors.Errorf("root ca path provided, but key file '%s' does not exists", c.TLS.RootCA)) - } - return errors.E(op, err) - } - } - } - - // used to set max time - infinity := time.Duration(math.MaxInt64) - - if c.PingTime == 0 { - c.PingTime = time.Hour * 2 - } - - if c.Timeout == 0 { - c.Timeout = time.Second * 20 - } - - if c.MaxConcurrentStreams == 0 { - c.MaxConcurrentStreams = 10 - } - // set default - if c.MaxConnectionAge == 0 { - c.MaxConnectionAge = infinity - } - - // set default - if c.MaxConnectionIdle == 0 { - c.MaxConnectionIdle = infinity - } - - if c.MaxConnectionAgeGrace == 0 { - c.MaxConnectionAgeGrace = infinity - } - - if c.MaxRecvMsgSize == 0 { - c.MaxRecvMsgSize = 1024 * 1024 * 50 - } else { - c.MaxRecvMsgSize = 1024 * 1024 * c.MaxRecvMsgSize - } - - if c.MaxSendMsgSize == 0 { - c.MaxSendMsgSize = 1024 * 1024 * 50 - } else { - c.MaxSendMsgSize = 1024 * 1024 * c.MaxSendMsgSize - } - - return nil -} - -func (c *Config) EnableTLS() bool { - if c.TLS != nil { - return (c.TLS.RootCA != "" && c.TLS.Key != "" && c.TLS.Cert != "") || (c.TLS.Key != "" && c.TLS.Cert != "") - } - - return false -} diff --git a/plugins/grpc/parser/message.proto b/plugins/grpc/parser/message.proto deleted file mode 100644 index a4012010..00000000 --- a/plugins/grpc/parser/message.proto +++ /dev/null @@ -1,7 +0,0 @@ -syntax = "proto3"; -package app.namespace; - -message Message { - string msg = 1; - int64 value = 2; -}
\ No newline at end of file diff --git a/plugins/grpc/parser/parse.go b/plugins/grpc/parser/parse.go deleted file mode 100644 index d59b0927..00000000 --- a/plugins/grpc/parser/parse.go +++ /dev/null @@ -1,114 +0,0 @@ -package parser - -import ( - "bytes" - "io" - "os" - - pp "github.com/emicklei/proto" -) - -// Service contains information about singular GRPC service. -type Service struct { - // Package defines service namespace. - Package string - - // Name defines service name. - Name string - - // Methods list. - Methods []Method -} - -// Method describes singular RPC method. -type Method struct { - // Name is method name. - Name string - - // StreamsRequest defines if method accept stream input. - StreamsRequest bool - - // RequestType defines message name (from the same package) of method input. - RequestType string - - // StreamsReturns defines if method streams result. - StreamsReturns bool - - // ReturnsType defines message name (from the same package) of method return value. - ReturnsType string -} - -// File parses given proto file or returns error. -func File(file string, importPath string) ([]Service, error) { - reader, _ := os.Open(file) - defer reader.Close() - - return parse(reader, importPath) -} - -// Bytes parses string into proto definition. -func Bytes(data []byte) ([]Service, error) { - return parse(bytes.NewBuffer(data), "") -} - -func parse(reader io.Reader, importPath string) ([]Service, error) { - proto, err := pp.NewParser(reader).Parse() - if err != nil { - return nil, err - } - - return parseServices( - proto, - parsePackage(proto), - importPath, - ) -} - -func parsePackage(proto *pp.Proto) string { - for _, e := range proto.Elements { - if p, ok := e.(*pp.Package); ok { - return p.Name - } - } - - return "" -} - -func parseServices(proto *pp.Proto, pkg string, importPath string) ([]Service, error) { - services := make([]Service, 0) - - pp.Walk(proto, pp.WithService(func(service *pp.Service) { - services = append(services, Service{ - Package: pkg, - Name: service.Name, - Methods: parseMethods(service), - }) - })) - - pp.Walk(proto, func(v pp.Visitee) { - if i, ok := v.(*pp.Import); ok { - if im, err := File(importPath+"/"+i.Filename, importPath); err == nil { - services = append(services, im...) - } - } - }) - - return services, nil -} - -func parseMethods(s *pp.Service) []Method { - methods := make([]Method, 0) - for _, e := range s.Elements { - if m, ok := e.(*pp.RPC); ok { - methods = append(methods, Method{ - Name: m.Name, - StreamsRequest: m.StreamsRequest, - RequestType: m.RequestType, - StreamsReturns: m.StreamsReturns, - ReturnsType: m.ReturnsType, - }) - } - } - - return methods -} diff --git a/plugins/grpc/parser/parse_test.go b/plugins/grpc/parser/parse_test.go deleted file mode 100644 index b71c133d..00000000 --- a/plugins/grpc/parser/parse_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package parser - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestParseFile(t *testing.T) { - services, err := File("test.proto", "") - assert.NoError(t, err) - assert.Len(t, services, 2) - - assert.Equal(t, "app.namespace", services[0].Package) -} - -func TestParseFileWithImportsNestedFolder(t *testing.T) { - services, err := File("./test_nested/test_import.proto", "./test_nested") - assert.NoError(t, err) - assert.Len(t, services, 2) - - assert.Equal(t, "app.namespace", services[0].Package) -} - -func TestParseFileWithImports(t *testing.T) { - services, err := File("test_import.proto", ".") - assert.NoError(t, err) - assert.Len(t, services, 2) - - assert.Equal(t, "app.namespace", services[0].Package) -} - -func TestParseNotFound(t *testing.T) { - _, err := File("test2.proto", "") - assert.Error(t, err) -} - -func TestParseBytes(t *testing.T) { - services, err := Bytes([]byte{}) - assert.NoError(t, err) - assert.Len(t, services, 0) -} - -func TestParseString(t *testing.T) { - services, err := Bytes([]byte(` -syntax = "proto3"; -package app.namespace; - -// Ping Service. -service PingService { - // Ping Method. - rpc Ping (Message) returns (Message) { - } -} - -// Pong service. -service PongService { - rpc Pong (stream Message) returns (stream Message) { - } -} - -message Message { - string msg = 1; - int64 value = 2; -} -`)) - assert.NoError(t, err) - assert.Len(t, services, 2) - - assert.Equal(t, "app.namespace", services[0].Package) -} diff --git a/plugins/grpc/parser/pong.proto b/plugins/grpc/parser/pong.proto deleted file mode 100644 index 9756fabe..00000000 --- a/plugins/grpc/parser/pong.proto +++ /dev/null @@ -1,10 +0,0 @@ -syntax = "proto3"; -package app.namespace; - -import "message.proto"; - -// Pong service. -service PongService { - rpc Pong (stream Message) returns (stream Message) { - } -}
\ No newline at end of file diff --git a/plugins/grpc/parser/test.proto b/plugins/grpc/parser/test.proto deleted file mode 100644 index e2230954..00000000 --- a/plugins/grpc/parser/test.proto +++ /dev/null @@ -1,20 +0,0 @@ -syntax = "proto3"; -package app.namespace; - -// Ping Service. -service PingService { - // Ping Method. - rpc Ping (Message) returns (Message) { - } -} - -// Pong service. -service PongService { - rpc Pong (stream Message) returns (stream Message) { - } -} - -message Message { - string msg = 1; - int64 value = 2; -}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_import.proto b/plugins/grpc/parser/test_import.proto deleted file mode 100644 index 1b954fc1..00000000 --- a/plugins/grpc/parser/test_import.proto +++ /dev/null @@ -1,12 +0,0 @@ -syntax = "proto3"; -package app.namespace; - -import "message.proto"; -import "pong.proto"; - -// Ping Service. -service PingService { - // Ping Method. - rpc Ping (Message) returns (Message) { - } -}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/message.proto b/plugins/grpc/parser/test_nested/message.proto deleted file mode 100644 index a4012010..00000000 --- a/plugins/grpc/parser/test_nested/message.proto +++ /dev/null @@ -1,7 +0,0 @@ -syntax = "proto3"; -package app.namespace; - -message Message { - string msg = 1; - int64 value = 2; -}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/pong.proto b/plugins/grpc/parser/test_nested/pong.proto deleted file mode 100644 index 9756fabe..00000000 --- a/plugins/grpc/parser/test_nested/pong.proto +++ /dev/null @@ -1,10 +0,0 @@ -syntax = "proto3"; -package app.namespace; - -import "message.proto"; - -// Pong service. -service PongService { - rpc Pong (stream Message) returns (stream Message) { - } -}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/test_import.proto b/plugins/grpc/parser/test_nested/test_import.proto deleted file mode 100644 index a3a476ba..00000000 --- a/plugins/grpc/parser/test_nested/test_import.proto +++ /dev/null @@ -1,12 +0,0 @@ -syntax = "proto3"; -package app.namespace; - -import "message.proto"; -import "pong.proto"; - -// Ping Service. -service PingService { - // Ping Method. - rpc Ping (Message) returns (Message) { - } -} diff --git a/plugins/grpc/plugin.go b/plugins/grpc/plugin.go deleted file mode 100644 index 7518d352..00000000 --- a/plugins/grpc/plugin.go +++ /dev/null @@ -1,195 +0,0 @@ -package grpc - -import ( - "context" - "sync" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/state/process" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/grpc/codec" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/utils" - "google.golang.org/grpc" - "google.golang.org/grpc/encoding" -) - -const ( - name string = "grpc" - RrGrpc string = "RR_GRPC" -) - -type Plugin struct { - mu *sync.RWMutex - config *Config - gPool pool.Pool - opts []grpc.ServerOption - services []func(server *grpc.Server) - server *grpc.Server - rrServer server.Server - - // events handler - events events.Handler - log logger.Logger -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { - const op = errors.Op("grpc_plugin_init") - - if !cfg.Has(name) { - return errors.E(errors.Disabled) - } - // register the codec - encoding.RegisterCodec(&codec.Codec{}) - - err := cfg.UnmarshalKey(name, &p.config) - if err != nil { - return errors.E(op, err) - } - - err = p.config.InitDefaults() - if err != nil { - return errors.E(op, err) - } - - p.opts = make([]grpc.ServerOption, 0) - p.services = make([]func(server *grpc.Server), 0) - p.events = events.NewEventsHandler() - p.events.AddListener(p.collectGRPCEvents) - p.rrServer = server - - // worker's GRPC mode - if p.config.Env == nil { - p.config.Env = make(map[string]string) - } - p.config.Env[RrGrpc] = "true" - - p.log = log - p.mu = &sync.RWMutex{} - - return nil -} - -func (p *Plugin) Serve() chan error { - const op = errors.Op("grpc_plugin_serve") - errCh := make(chan error, 1) - - var err error - p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{ - Debug: p.config.GrpcPool.Debug, - NumWorkers: p.config.GrpcPool.NumWorkers, - MaxJobs: p.config.GrpcPool.MaxJobs, - AllocateTimeout: p.config.GrpcPool.AllocateTimeout, - DestroyTimeout: p.config.GrpcPool.DestroyTimeout, - Supervisor: p.config.GrpcPool.Supervisor, - }, p.config.Env, p.collectGRPCEvents) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - go func() { - var err error - p.mu.Lock() - p.server, err = p.createGRPCserver() - if err != nil { - p.log.Error("create grpc server", "error", err) - errCh <- errors.E(op, err) - return - } - - l, err := utils.CreateListener(p.config.Listen) - if err != nil { - p.log.Error("create grpc listener", "error", err) - errCh <- errors.E(op, err) - } - - // protect serve - p.mu.Unlock() - err = p.server.Serve(l) - if err != nil { - // skip errors when stopping the server - if err == grpc.ErrServerStopped { - return - } - - p.log.Error("grpc server stopped", "error", err) - errCh <- errors.E(op, err) - return - } - }() - - return errCh -} - -func (p *Plugin) Stop() error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.server != nil { - p.server.Stop() - } - return nil -} - -func (p *Plugin) Available() {} - -func (p *Plugin) Name() string { - return name -} - -func (p *Plugin) Reset() error { - p.mu.Lock() - defer p.mu.Unlock() - const op = errors.Op("grpc_plugin_reset") - - // destroy old pool - p.gPool.Destroy(context.Background()) - - var err error - p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{ - Debug: p.config.GrpcPool.Debug, - NumWorkers: p.config.GrpcPool.NumWorkers, - MaxJobs: p.config.GrpcPool.MaxJobs, - AllocateTimeout: p.config.GrpcPool.AllocateTimeout, - DestroyTimeout: p.config.GrpcPool.DestroyTimeout, - Supervisor: p.config.GrpcPool.Supervisor, - }, p.config.Env, p.collectGRPCEvents) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (p *Plugin) Workers() []*process.State { - p.mu.RLock() - defer p.mu.RUnlock() - - workers := p.gPool.Workers() - - ps := make([]*process.State, 0, len(workers)) - for i := 0; i < len(workers); i++ { - state, err := process.WorkerProcessState(workers[i]) - if err != nil { - return nil - } - ps = append(ps, state) - } - - return ps -} - -func (p *Plugin) collectGRPCEvents(event interface{}) { - if gev, ok := event.(events.GRPCEvent); ok { - switch gev.Event { - case events.EventUnaryCallOk: - p.log.Info("method called", "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed) - case events.EventUnaryCallErr: - p.log.Info("method call finished with error", "error", gev.Error, "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed) - } - } -} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go deleted file mode 100644 index 0894a7a8..00000000 --- a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go +++ /dev/null @@ -1,68 +0,0 @@ -// MIT License -// -// Copyright (c) 2018 SpiralScout -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package main - -import ( - "io" - "io/ioutil" - "os" - - "github.com/spiral/roadrunner/v2/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php" - "google.golang.org/protobuf/proto" - plugin "google.golang.org/protobuf/types/pluginpb" -) - -func main() { - req, err := readRequest(os.Stdin) - if err != nil { - panic(err) - } - - if err = writeResponse(os.Stdout, php.Generate(req)); err != nil { - panic(err) - } -} - -func readRequest(in io.Reader) (*plugin.CodeGeneratorRequest, error) { - data, err := ioutil.ReadAll(in) - if err != nil { - return nil, err - } - - req := new(plugin.CodeGeneratorRequest) - if err = proto.Unmarshal(data, req); err != nil { - return nil, err - } - - return req, nil -} - -func writeResponse(out io.Writer, resp *plugin.CodeGeneratorResponse) error { - data, err := proto.Marshal(resp) - if err != nil { - return err - } - - _, err = out.Write(data) - return err -} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go deleted file mode 100644 index 03c48ac8..00000000 --- a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go +++ /dev/null @@ -1,57 +0,0 @@ -// MIT License -// -// Copyright (c) 2018 SpiralScout -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package php - -import ( - desc "google.golang.org/protobuf/types/descriptorpb" - plugin "google.golang.org/protobuf/types/pluginpb" -) - -// Generate generates needed service classes -func Generate(req *plugin.CodeGeneratorRequest) *plugin.CodeGeneratorResponse { - resp := &plugin.CodeGeneratorResponse{} - - for _, file := range req.ProtoFile { - for _, service := range file.Service { - resp.File = append(resp.File, generate(req, file, service)) - } - } - - return resp -} - -func generate( - req *plugin.CodeGeneratorRequest, - file *desc.FileDescriptorProto, - service *desc.ServiceDescriptorProto, -) *plugin.CodeGeneratorResponse_File { - return &plugin.CodeGeneratorResponse_File{ - Name: str(filename(file, service.Name)), - Content: str(body(req, file, service)), - } -} - -// helper to convert string into string pointer -func str(str string) *string { - return &str -} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go deleted file mode 100644 index 32579e33..00000000 --- a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go +++ /dev/null @@ -1,139 +0,0 @@ -// MIT License -// -// Copyright (c) 2018 SpiralScout -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package php - -import ( - "bytes" - "strings" - "unicode" -) - -// @see https://github.com/protocolbuffers/protobuf/blob/master/php/ext/google/protobuf/protobuf.c#L168 -var reservedKeywords = []string{ - "abstract", "and", "array", "as", "break", - "callable", "case", "catch", "class", "clone", - "const", "continue", "declare", "default", "die", - "do", "echo", "else", "elseif", "empty", - "enddeclare", "endfor", "endforeach", "endif", "endswitch", - "endwhile", "eval", "exit", "extends", "final", - "for", "foreach", "function", "global", "goto", - "if", "implements", "include", "include_once", "instanceof", - "insteadof", "interface", "isset", "list", "namespace", - "new", "or", "print", "private", "protected", - "public", "require", "require_once", "return", "static", - "switch", "throw", "trait", "try", "unset", - "use", "var", "while", "xor", "int", - "float", "bool", "string", "true", "false", - "null", "void", "iterable", -} - -// Check if given name/keyword is reserved by php. -func isReserved(name string) bool { - name = strings.ToLower(name) - for _, k := range reservedKeywords { - if name == k { - return true - } - } - - return false -} - -// generate php namespace or path -func namespace(pkg *string, sep string) string { - if pkg == nil { - return "" - } - - result := bytes.NewBuffer(nil) - for _, p := range strings.Split(*pkg, ".") { - result.WriteString(identifier(p, "")) - result.WriteString(sep) - } - - return strings.Trim(result.String(), sep) -} - -// create php identifier for class or message -func identifier(name string, suffix string) string { - name = Camelize(name) - if suffix != "" { - return name + Camelize(suffix) - } - - return name -} - -func resolveReserved(identifier string, pkg string) string { - if isReserved(strings.ToLower(identifier)) { - if pkg == ".google.protobuf" { - return "GPB" + identifier - } - return "PB" + identifier - } - - return identifier -} - -// Camelize "dino_party" -> "DinoParty" -func Camelize(word string) string { - words := splitAtCaseChangeWithTitlecase(word) - return strings.Join(words, "") -} - -func splitAtCaseChangeWithTitlecase(s string) []string { - words := make([]string, 0) - word := make([]rune, 0) - for _, c := range s { - spacer := isSpacerChar(c) - if len(word) > 0 { - if unicode.IsUpper(c) || spacer { - words = append(words, string(word)) - word = make([]rune, 0) - } - } - if !spacer { - if len(word) > 0 { - word = append(word, unicode.ToLower(c)) - } else { - word = append(word, unicode.ToUpper(c)) - } - } - } - words = append(words, string(word)) - return words -} - -func isSpacerChar(c rune) bool { - switch { - case c == rune("_"[0]): - return true - case c == rune(" "[0]): - return true - case c == rune(":"[0]): - return true - case c == rune("-"[0]): - return true - } - return false -} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go deleted file mode 100644 index c1dc3898..00000000 --- a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go +++ /dev/null @@ -1,103 +0,0 @@ -package php - -import ( - "bytes" - "fmt" - "strings" - - desc "google.golang.org/protobuf/types/descriptorpb" - plugin "google.golang.org/protobuf/types/pluginpb" -) - -// manages internal name representation of the package -type ns struct { - // Package defines file package. - Package string - - // Root namespace of the package - Namespace string - - // Import declares what namespaces to be imported - Import map[string]string -} - -// newNamespace creates new work namespace. -func newNamespace(req *plugin.CodeGeneratorRequest, file *desc.FileDescriptorProto, service *desc.ServiceDescriptorProto) *ns { - ns := &ns{ - Package: *file.Package, - Namespace: namespace(file.Package, "\\"), - Import: make(map[string]string), - } - - if file.Options != nil && file.Options.PhpNamespace != nil { - ns.Namespace = *file.Options.PhpNamespace - } - - for k := range service.Method { - ns.importMessage(req, service.Method[k].InputType) - ns.importMessage(req, service.Method[k].OutputType) - } - - return ns -} - -// importMessage registers new import message namespace (only the namespace). -func (ns *ns) importMessage(req *plugin.CodeGeneratorRequest, msg *string) { - if msg == nil { - return - } - - chunks := strings.Split(*msg, ".") - pkg := strings.Join(chunks[:len(chunks)-1], ".") - - result := bytes.NewBuffer(nil) - for _, p := range chunks[:len(chunks)-1] { - result.WriteString(identifier(p, "")) - result.WriteString(`\`) - } - - if pkg == "."+ns.Package { - // root package - return - } - - for _, f := range req.ProtoFile { - if pkg == "."+*f.Package { - if f.Options != nil && f.Options.PhpNamespace != nil { - // custom imported namespace - ns.Import[pkg] = *f.Options.PhpNamespace - return - } - } - } - - ns.Import[pkg] = strings.Trim(result.String(), `\`) -} - -// resolve message alias -func (ns *ns) resolve(msg *string) string { - chunks := strings.Split(*msg, ".") - pkg := strings.Join(chunks[:len(chunks)-1], ".") - - if pkg == "."+ns.Package { - // root message - return identifier(chunks[len(chunks)-1], "") - } - - for iPkg, ns := range ns.Import { - if pkg == iPkg { - // use last namespace chunk - nsChunks := strings.Split(ns, `\`) - identifier := identifier(chunks[len(chunks)-1], "") - - return fmt.Sprintf( - `%s\%s`, - nsChunks[len(nsChunks)-1], - resolveReserved(identifier, pkg), - ) - } - } - - // fully clarified name (fallback) - return "\\" + namespace(msg, "\\") -} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go deleted file mode 100644 index e00c6fdd..00000000 --- a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go +++ /dev/null @@ -1,103 +0,0 @@ -// MIT License -// -// Copyright (c) 2018 SpiralScout -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package php - -import ( - "bytes" - "fmt" - "strings" - "text/template" - - desc "google.golang.org/protobuf/types/descriptorpb" - plugin "google.golang.org/protobuf/types/pluginpb" -) - -const phpBody = `<?php -# Generated by the protocol buffer compiler (spiral/php-grpc). DO NOT EDIT! -# source: {{ .File.Name }} -{{ $ns := .Namespace -}} -{{if $ns.Namespace}} -namespace {{ $ns.Namespace }}; -{{end}} -use Spiral\GRPC; -{{- range $n := $ns.Import}} -use {{ $n }}; -{{- end}} - -interface {{ .Service.Name | interface }} extends GRPC\ServiceInterface -{ - // GRPC specific service name. - public const NAME = "{{ .File.Package }}.{{ .Service.Name }}";{{ "\n" }} -{{- range $m := .Service.Method}} - /** - * @param GRPC\ContextInterface $ctx - * @param {{ name $ns $m.InputType }} $in - * @return {{ name $ns $m.OutputType }} - * - * @throws GRPC\Exception\InvokeException - */ - public function {{ $m.Name }}(GRPC\ContextInterface $ctx, {{ name $ns $m.InputType }} $in): {{ name $ns $m.OutputType }}; -{{end -}} -} -` - -// generate php filename -func filename(file *desc.FileDescriptorProto, name *string) string { - ns := namespace(file.Package, "/") - if file.Options != nil && file.Options.PhpNamespace != nil { - ns = strings.ReplaceAll(*file.Options.PhpNamespace, `\`, `/`) - } - - return fmt.Sprintf("%s/%s.php", ns, identifier(*name, "interface")) -} - -// generate php file body -func body(req *plugin.CodeGeneratorRequest, file *desc.FileDescriptorProto, service *desc.ServiceDescriptorProto) string { - out := bytes.NewBuffer(nil) - - data := struct { - Namespace *ns - File *desc.FileDescriptorProto - Service *desc.ServiceDescriptorProto - }{ - Namespace: newNamespace(req, file, service), - File: file, - Service: service, - } - - tpl := template.Must(template.New("phpBody").Funcs(template.FuncMap{ - "interface": func(name *string) string { - return identifier(*name, "interface") - }, - "name": func(ns *ns, name *string) string { - return ns.resolve(name) - }, - }).Parse(phpBody)) - - err := tpl.Execute(out, data) - if err != nil { - panic(err) - } - - return out.String() -} diff --git a/plugins/grpc/proxy/proxy.go b/plugins/grpc/proxy/proxy.go deleted file mode 100644 index 074aac85..00000000 --- a/plugins/grpc/proxy/proxy.go +++ /dev/null @@ -1,219 +0,0 @@ -package proxy - -import ( - "encoding/json" - "fmt" - "strconv" - "strings" - "sync" - - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/grpc/codec" - "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" -) - -const ( - peerAddr string = ":peer.address" - peerAuthType string = ":peer.auth-type" - delimiter string = "|:|" -) - -// base interface for Proxy class -type proxyService interface { - // RegisterMethod registers new RPC method. - RegisterMethod(method string) - - // ServiceDesc returns service description for the proxy. - ServiceDesc() *grpc.ServiceDesc -} - -// carry details about service, method and RPC context to PHP process -type rpcContext struct { - Service string `json:"service"` - Method string `json:"method"` - Context map[string][]string `json:"context"` -} - -// Proxy manages GRPC/RoadRunner bridge. -type Proxy struct { - mu *sync.RWMutex - grpcPool pool.Pool - name string - metadata string - methods []string -} - -// NewProxy creates new service proxy object. -func NewProxy(name string, metadata string, grpcPool pool.Pool, mu *sync.RWMutex) *Proxy { - return &Proxy{ - mu: mu, - grpcPool: grpcPool, - name: name, - metadata: metadata, - methods: make([]string, 0), - } -} - -// RegisterMethod registers new RPC method. -func (p *Proxy) RegisterMethod(method string) { - p.methods = append(p.methods, method) -} - -// ServiceDesc returns service description for the proxy. -func (p *Proxy) ServiceDesc() *grpc.ServiceDesc { - desc := &grpc.ServiceDesc{ - ServiceName: p.name, - Metadata: p.metadata, - HandlerType: (*proxyService)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{}, - } - - // Registering methods - for _, m := range p.methods { - desc.Methods = append(desc.Methods, grpc.MethodDesc{ - MethodName: m, - Handler: p.methodHandler(m), - }) - } - - return desc -} - -// Generate method handler proxy. -func (p *Proxy) methodHandler(method string) func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - return func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := codec.RawMessage{} - if err := dec(&in); err != nil { - return nil, wrapError(err) - } - - if interceptor == nil { - return p.invoke(ctx, method, in) - } - - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: fmt.Sprintf("/%s/%s", p.name, method), - } - - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return p.invoke(ctx, method, req.(codec.RawMessage)) - } - - return interceptor(ctx, in, info, handler) - } -} - -func (p *Proxy) invoke(ctx context.Context, method string, in codec.RawMessage) (interface{}, error) { - payload, err := p.makePayload(ctx, method, in) - if err != nil { - return nil, err - } - - p.mu.RLock() - resp, err := p.grpcPool.Exec(payload) - p.mu.RUnlock() - - if err != nil { - return nil, wrapError(err) - } - - md, err := p.responseMetadata(resp) - if err != nil { - return nil, err - } - ctx = metadata.NewIncomingContext(ctx, md) - err = grpc.SetHeader(ctx, md) - if err != nil { - return nil, err - } - - return codec.RawMessage(resp.Body), nil -} - -// responseMetadata extracts metadata from roadrunner response Payload.Context and converts it to metadata.MD -func (p *Proxy) responseMetadata(resp *payload.Payload) (metadata.MD, error) { - var md metadata.MD - if resp == nil || len(resp.Context) == 0 { - return md, nil - } - - var rpcMetadata map[string]string - err := json.Unmarshal(resp.Context, &rpcMetadata) - if err != nil { - return md, err - } - - if len(rpcMetadata) > 0 { - md = metadata.New(rpcMetadata) - } - - return md, nil -} - -// makePayload generates RoadRunner compatible payload based on GRPC message. todo: return error -func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMessage) (*payload.Payload, error) { - ctxMD := make(map[string][]string) - - if md, ok := metadata.FromIncomingContext(ctx); ok { - for k, v := range md { - ctxMD[k] = v - } - } - - if pr, ok := peer.FromContext(ctx); ok { - ctxMD[peerAddr] = []string{pr.Addr.String()} - if pr.AuthInfo != nil { - ctxMD[peerAuthType] = []string{pr.AuthInfo.AuthType()} - } - } - - ctxData, err := json.Marshal(rpcContext{Service: p.name, Method: method, Context: ctxMD}) - - if err != nil { - return nil, err - } - - return &payload.Payload{Context: ctxData, Body: body}, nil -} - -// mounts proper error code for the error -func wrapError(err error) error { - // internal agreement - if strings.Contains(err.Error(), delimiter) { - chunks := strings.Split(err.Error(), delimiter) - code := codes.Internal - - // protect the slice access - if len(chunks) < 2 { - return err - } - - if phpCode, errConv := strconv.ParseUint(chunks[0], 10, 32); errConv == nil { - code = codes.Code(phpCode) - } - - st := status.New(code, chunks[1]).Proto() - - for _, detailsMessage := range chunks[2:] { - anyDetailsMessage := anypb.Any{} - errP := proto.Unmarshal([]byte(detailsMessage), &anyDetailsMessage) - if errP == nil { - st.Details = append(st.Details, &anyDetailsMessage) - } - } - - return status.ErrorProto(st) - } - - return status.Error(codes.Internal, err.Error()) -} diff --git a/plugins/grpc/proxy/proxy_test.go b/plugins/grpc/proxy/proxy_test.go deleted file mode 100644 index 2c024ee3..00000000 --- a/plugins/grpc/proxy/proxy_test.go +++ /dev/null @@ -1,134 +0,0 @@ -package proxy - -// import ( -// "testing" -// "time" - -// "github.com/sirupsen/logrus" -// "github.com/sirupsen/logrus/hooks/test" -// "github.com/stretchr/testify/assert" -// "golang.org/x/net/context" -// "google.golang.org/grpc" -// "google.golang.org/grpc/codes" -// "google.golang.org/grpc/metadata" -// "google.golang.org/grpc/status" -// ) - -// const addr = "localhost:9080" - -// func Test_Proxy_Error(t *testing.T) { -// logger, _ := test.NewNullLogger() -// logger.SetLevel(logrus.DebugLevel) - -// c := service.NewContainer(logger) -// c.Register(ID, &Service{}) - -// assert.NoError(t, c.Init(&testCfg{ -// grpcCfg: `{ -// "listen": "tcp://:9080", -// "tls": { -// "key": "tests/server.key", -// "cert": "tests/server.crt" -// }, -// "proto": "tests/test.proto", -// "workers":{ -// "command": "php tests/worker.php", -// "relay": "pipes", -// "pool": { -// "numWorkers": 1, -// "allocateTimeout": 10, -// "destroyTimeout": 10 -// } -// } -// }`, -// })) - -// s, st := c.Get(ID) -// assert.NotNil(t, s) -// assert.Equal(t, service.StatusOK, st) - -// // should do nothing -// s.(*Service).Stop() - -// go func() { assert.NoError(t, c.Serve()) }() -// time.Sleep(time.Millisecond * 100) -// defer c.Stop() - -// cl, cn := getClient(addr) -// defer cn.Close() - -// _, err := cl.Throw(context.Background(), &tests.Message{Msg: "notFound"}) - -// assert.Error(t, err) -// se, _ := status.FromError(err) -// assert.Equal(t, "nothing here", se.Message()) -// assert.Equal(t, codes.NotFound, se.Code()) - -// _, errWithDetails := cl.Throw(context.Background(), &tests.Message{Msg: "withDetails"}) - -// assert.Error(t, errWithDetails) -// statusWithDetails, _ := status.FromError(errWithDetails) -// assert.Equal(t, "main exception message", statusWithDetails.Message()) -// assert.Equal(t, codes.InvalidArgument, statusWithDetails.Code()) - -// details := statusWithDetails.Details() - -// detailsMessageForException := details[0].(*tests.DetailsMessageForException) - -// assert.Equal(t, detailsMessageForException.Code, uint64(1)) -// assert.Equal(t, detailsMessageForException.Message, "details message") -// } - -// func Test_Proxy_Metadata(t *testing.T) { -// logger, _ := test.NewNullLogger() -// logger.SetLevel(logrus.DebugLevel) - -// c := service.NewContainer(logger) -// c.Register(ID, &Service{}) - -// assert.NoError(t, c.Init(&testCfg{ -// grpcCfg: `{ -// "listen": "tcp://:9080", -// "tls": { -// "key": "tests/server.key", -// "cert": "tests/server.crt" -// }, -// "proto": "tests/test.proto", -// "workers":{ -// "command": "php tests/worker.php", -// "relay": "pipes", -// "pool": { -// "numWorkers": 1, -// "allocateTimeout": 10, -// "destroyTimeout": 10 -// } -// } -// }`, -// })) - -// s, st := c.Get(ID) -// assert.NotNil(t, s) -// assert.Equal(t, service.StatusOK, st) - -// // should do nothing -// s.(*Service).Stop() - -// go func() { assert.NoError(t, c.Serve()) }() -// time.Sleep(time.Millisecond * 100) -// defer c.Stop() - -// cl, cn := getClient(addr) -// defer cn.Close() - -// ctx := metadata.AppendToOutgoingContext(context.Background(), "key", "proxy-value") -// var header metadata.MD -// out, err := cl.Info( -// ctx, -// &tests.Message{Msg: "MD"}, -// grpc.Header(&header), -// grpc.WaitForReady(true), -// ) -// assert.Equal(t, []string{"bar"}, header.Get("foo")) -// assert.NoError(t, err) -// assert.Equal(t, `["proxy-value"]`, out.Msg) -// } diff --git a/plugins/grpc/server.go b/plugins/grpc/server.go deleted file mode 100644 index 323f73a0..00000000 --- a/plugins/grpc/server.go +++ /dev/null @@ -1,154 +0,0 @@ -package grpc - -import ( - "context" - "crypto/tls" - "crypto/x509" - "fmt" - "os" - "path" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/plugins/grpc/parser" - "github.com/spiral/roadrunner/v2/plugins/grpc/proxy" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" -) - -func (p *Plugin) createGRPCserver() (*grpc.Server, error) { - const op = errors.Op("grpc_plugin_create_server") - opts, err := p.serverOptions() - if err != nil { - return nil, errors.E(op, err) - } - - server := grpc.NewServer(opts...) - - if p.config.Proto != "" { - // php proxy services - services, err := parser.File(p.config.Proto, path.Dir(p.config.Proto)) - if err != nil { - return nil, err - } - - for _, service := range services { - p := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto, p.gPool, p.mu) - for _, m := range service.Methods { - p.RegisterMethod(m.Name) - } - - server.RegisterService(p.ServiceDesc(), p) - } - } - - // external and native services - for _, r := range p.services { - r(server) - } - - return server, nil -} - -func (p *Plugin) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - start := time.Now() - resp, err := handler(ctx, req) - if err != nil { - p.events.Push(events.GRPCEvent{ - Event: events.EventUnaryCallErr, - Info: info, - Error: err, - Start: start, - Elapsed: time.Since(start), - }) - - return nil, err - } - - p.events.Push(events.GRPCEvent{ - Event: events.EventUnaryCallOk, - Info: info, - Start: start, - Elapsed: time.Since(start), - }) - - return resp, nil -} - -func (p *Plugin) serverOptions() ([]grpc.ServerOption, error) { - const op = errors.Op("grpc_plugin_server_options") - - var tcreds credentials.TransportCredentials - var opts []grpc.ServerOption - var cert tls.Certificate - var certPool *x509.CertPool - var rca []byte - var err error - - if p.config.EnableTLS() { - // if client CA is not empty we combine it with Cert and Key - if p.config.TLS.RootCA != "" { - cert, err = tls.LoadX509KeyPair(p.config.TLS.Cert, p.config.TLS.Key) - if err != nil { - return nil, err - } - - certPool, err = x509.SystemCertPool() - if err != nil { - return nil, err - } - if certPool == nil { - certPool = x509.NewCertPool() - } - - rca, err = os.ReadFile(p.config.TLS.RootCA) - if err != nil { - return nil, err - } - - if ok := certPool.AppendCertsFromPEM(rca); !ok { - return nil, errors.E(op, errors.Str("could not append Certs from PEM")) - } - - tcreds = credentials.NewTLS(&tls.Config{ - MinVersion: tls.VersionTLS12, - ClientAuth: tls.RequireAndVerifyClientCert, - Certificates: []tls.Certificate{cert}, - ClientCAs: certPool, - }) - } else { - tcreds, err = credentials.NewServerTLSFromFile(p.config.TLS.Cert, p.config.TLS.Key) - if err != nil { - return nil, err - } - } - - serverOptions := []grpc.ServerOption{ - grpc.MaxSendMsgSize(int(p.config.MaxSendMsgSize)), - grpc.MaxRecvMsgSize(int(p.config.MaxRecvMsgSize)), - grpc.KeepaliveParams(keepalive.ServerParameters{ - MaxConnectionIdle: p.config.MaxConnectionIdle, - MaxConnectionAge: p.config.MaxConnectionAge, - MaxConnectionAgeGrace: p.config.MaxConnectionAge, - Time: p.config.PingTime, - Timeout: p.config.Timeout, - }), - grpc.MaxConcurrentStreams(uint32(p.config.MaxConcurrentStreams)), - } - - opts = append(opts, grpc.Creds(tcreds)) - opts = append(opts, serverOptions...) - } - - opts = append(opts, p.opts...) - - // custom codec is required to bypass protobuf, common interceptor used for debug and stats - return append( - opts, - grpc.UnaryInterceptor(p.interceptor), - // TODO(rustatian): check deprecation - // grpc.CustomCodec(&codec{encoding.GetCodec(encCodec)}), - ), nil -} |