summaryrefslogtreecommitdiff
path: root/plugins/grpc
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 21:24:13 +0300
committerGitHub <[email protected]>2021-09-16 21:24:13 +0300
commit337d292dd2d6ff0a555098b1970d8194d8df8bc2 (patch)
treea2ab31666f95813a592bea2b207f2db0ba188c92 /plugins/grpc
parent5d2cd55ab522d4f1e65a833f91146444465a32ac (diff)
parentcc56349f3ad19aa54ae7900c50e018d757305804 (diff)
[#783]: feat(grpc): update GRPC plugin to RR `v2`
[#783]: feat(grpc): update GRPC plugin to RR `v2`
Diffstat (limited to 'plugins/grpc')
-rw-r--r--plugins/grpc/codec/codec.go44
-rw-r--r--plugins/grpc/codec/codec_test.go79
-rw-r--r--plugins/grpc/config.go128
-rw-r--r--plugins/grpc/parser/message.proto7
-rw-r--r--plugins/grpc/parser/parse.go114
-rw-r--r--plugins/grpc/parser/parse_test.go71
-rw-r--r--plugins/grpc/parser/pong.proto10
-rw-r--r--plugins/grpc/parser/test.proto20
-rw-r--r--plugins/grpc/parser/test_import.proto12
-rw-r--r--plugins/grpc/parser/test_nested/message.proto7
-rw-r--r--plugins/grpc/parser/test_nested/pong.proto10
-rw-r--r--plugins/grpc/parser/test_nested/test_import.proto12
-rw-r--r--plugins/grpc/plugin.go195
-rw-r--r--plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go68
-rw-r--r--plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go57
-rw-r--r--plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go139
-rw-r--r--plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go103
-rw-r--r--plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go103
-rw-r--r--plugins/grpc/proxy/proxy.go219
-rw-r--r--plugins/grpc/proxy/proxy_test.go134
-rw-r--r--plugins/grpc/server.go154
21 files changed, 1686 insertions, 0 deletions
diff --git a/plugins/grpc/codec/codec.go b/plugins/grpc/codec/codec.go
new file mode 100644
index 00000000..a9d89ac5
--- /dev/null
+++ b/plugins/grpc/codec/codec.go
@@ -0,0 +1,44 @@
+package codec
+
+import "google.golang.org/grpc/encoding"
+
+type RawMessage []byte
+
+// By default, gRPC registers and uses the "proto" codec, so it is not necessary to do this in your own code to send and receive proto messages.
+// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec
+const cName string = "proto"
+const rm string = "rawMessage"
+
+func (r RawMessage) Reset() {}
+func (RawMessage) ProtoMessage() {}
+func (RawMessage) String() string { return rm }
+
+type Codec struct{ base encoding.Codec }
+
+// Marshal returns the wire format of v. rawMessages would be returned without encoding.
+func (c *Codec) Marshal(v interface{}) ([]byte, error) {
+ if raw, ok := v.(RawMessage); ok {
+ return raw, nil
+ }
+
+ return c.base.Marshal(v)
+}
+
+// Unmarshal parses the wire format into v. rawMessages would not be unmarshalled.
+func (c *Codec) Unmarshal(data []byte, v interface{}) error {
+ if raw, ok := v.(*RawMessage); ok {
+ *raw = data
+ return nil
+ }
+
+ return c.base.Unmarshal(data, v)
+}
+
+func (c *Codec) Name() string {
+ return cName
+}
+
+// String return codec name.
+func (c *Codec) String() string {
+ return "raw:" + c.base.Name()
+}
diff --git a/plugins/grpc/codec/codec_test.go b/plugins/grpc/codec/codec_test.go
new file mode 100644
index 00000000..60efb072
--- /dev/null
+++ b/plugins/grpc/codec/codec_test.go
@@ -0,0 +1,79 @@
+package codec
+
+import (
+ "testing"
+
+ json "github.com/json-iterator/go"
+ "github.com/stretchr/testify/assert"
+)
+
+type jsonCodec struct{}
+
+func (jsonCodec) Marshal(v interface{}) ([]byte, error) {
+ return json.Marshal(v)
+}
+
+func (jsonCodec) Unmarshal(data []byte, v interface{}) error {
+ return json.Unmarshal(data, v)
+}
+
+func (jsonCodec) Name() string {
+ return "json"
+}
+
+func TestCodec_String(t *testing.T) {
+ c := Codec{jsonCodec{}}
+
+ assert.Equal(t, "raw:json", c.String())
+
+ r := RawMessage{}
+ r.Reset()
+ r.ProtoMessage()
+ assert.Equal(t, "rawMessage", r.String())
+}
+
+func TestCodec_Unmarshal_ByPass(t *testing.T) {
+ c := Codec{jsonCodec{}}
+
+ s := struct {
+ Name string
+ }{}
+
+ assert.NoError(t, c.Unmarshal([]byte(`{"name":"name"}`), &s))
+ assert.Equal(t, "name", s.Name)
+}
+
+func TestCodec_Marshal_ByPass(t *testing.T) {
+ c := Codec{jsonCodec{}}
+
+ s := struct {
+ Name string
+ }{
+ Name: "name",
+ }
+
+ d, err := c.Marshal(s)
+ assert.NoError(t, err)
+
+ assert.Equal(t, `{"Name":"name"}`, string(d))
+}
+
+func TestCodec_Unmarshal_Raw(t *testing.T) {
+ c := Codec{jsonCodec{}}
+
+ s := RawMessage{}
+
+ assert.NoError(t, c.Unmarshal([]byte(`{"name":"name"}`), &s))
+ assert.Equal(t, `{"name":"name"}`, string(s))
+}
+
+func TestCodec_Marshal_Raw(t *testing.T) {
+ c := Codec{jsonCodec{}}
+
+ s := RawMessage(`{"Name":"name"}`)
+
+ d, err := c.Marshal(s)
+ assert.NoError(t, err)
+
+ assert.Equal(t, `{"Name":"name"}`, string(d))
+}
diff --git a/plugins/grpc/config.go b/plugins/grpc/config.go
new file mode 100644
index 00000000..fedd4998
--- /dev/null
+++ b/plugins/grpc/config.go
@@ -0,0 +1,128 @@
+package grpc
+
+import (
+ "math"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+)
+
+type Config struct {
+ Listen string `mapstructure:"listen"`
+ Proto string `mapstructure:"proto"`
+
+ TLS *TLS
+
+ // Env is environment variables passed to the http pool
+ Env map[string]string `mapstructure:"env"`
+
+ GrpcPool *pool.Config `mapstructure:"pool"`
+ MaxSendMsgSize int64 `mapstructure:"max_send_msg_size"`
+ MaxRecvMsgSize int64 `mapstructure:"max_recv_msg_size"`
+ MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle"`
+ MaxConnectionAge time.Duration `mapstructure:"max_connection_age"`
+ MaxConnectionAgeGrace time.Duration `mapstructure:"max_connection_age_grace"`
+ MaxConcurrentStreams int64 `mapstructure:"max_concurrent_streams"`
+ PingTime time.Duration `mapstructure:"ping_time"`
+ Timeout time.Duration `mapstructure:"timeout"`
+}
+
+type TLS struct {
+ Key string
+ Cert string
+ RootCA string
+}
+
+func (c *Config) InitDefaults() error { //nolint:gocognit
+ const op = errors.Op("grpc_plugin_config")
+ if c.GrpcPool == nil {
+ c.GrpcPool = &pool.Config{}
+ }
+
+ c.GrpcPool.InitDefaults()
+
+ if !strings.Contains(c.Listen, ":") {
+ return errors.E(op, errors.Errorf("mailformed grpc grpc address, provided: %s", c.Listen))
+ }
+
+ if c.EnableTLS() {
+ if _, err := os.Stat(c.TLS.Key); err != nil {
+ if os.IsNotExist(err) {
+ return errors.E(op, errors.Errorf("key file '%s' does not exists", c.TLS.Key))
+ }
+
+ return errors.E(op, err)
+ }
+
+ if _, err := os.Stat(c.TLS.Cert); err != nil {
+ if os.IsNotExist(err) {
+ return errors.E(op, errors.Errorf("cert file '%s' does not exists", c.TLS.Cert))
+ }
+
+ return errors.E(op, err)
+ }
+
+ // RootCA is optional, but if provided - check it
+ if c.TLS.RootCA != "" {
+ if _, err := os.Stat(c.TLS.RootCA); err != nil {
+ if os.IsNotExist(err) {
+ return errors.E(op, errors.Errorf("root ca path provided, but key file '%s' does not exists", c.TLS.RootCA))
+ }
+ return errors.E(op, err)
+ }
+ }
+ }
+
+ // used to set max time
+ infinity := time.Duration(math.MaxInt64)
+
+ if c.PingTime == 0 {
+ c.PingTime = time.Hour * 2
+ }
+
+ if c.Timeout == 0 {
+ c.Timeout = time.Second * 20
+ }
+
+ if c.MaxConcurrentStreams == 0 {
+ c.MaxConcurrentStreams = 10
+ }
+ // set default
+ if c.MaxConnectionAge == 0 {
+ c.MaxConnectionAge = infinity
+ }
+
+ // set default
+ if c.MaxConnectionIdle == 0 {
+ c.MaxConnectionIdle = infinity
+ }
+
+ if c.MaxConnectionAgeGrace == 0 {
+ c.MaxConnectionAgeGrace = infinity
+ }
+
+ if c.MaxRecvMsgSize == 0 {
+ c.MaxRecvMsgSize = 1024 * 1024 * 50
+ } else {
+ c.MaxRecvMsgSize = 1024 * 1024 * c.MaxRecvMsgSize
+ }
+
+ if c.MaxSendMsgSize == 0 {
+ c.MaxSendMsgSize = 1024 * 1024 * 50
+ } else {
+ c.MaxSendMsgSize = 1024 * 1024 * c.MaxSendMsgSize
+ }
+
+ return nil
+}
+
+func (c *Config) EnableTLS() bool {
+ if c.TLS != nil {
+ return (c.TLS.RootCA != "" && c.TLS.Key != "" && c.TLS.Cert != "") || (c.TLS.Key != "" && c.TLS.Cert != "")
+ }
+
+ return false
+}
diff --git a/plugins/grpc/parser/message.proto b/plugins/grpc/parser/message.proto
new file mode 100644
index 00000000..a4012010
--- /dev/null
+++ b/plugins/grpc/parser/message.proto
@@ -0,0 +1,7 @@
+syntax = "proto3";
+package app.namespace;
+
+message Message {
+ string msg = 1;
+ int64 value = 2;
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/parse.go b/plugins/grpc/parser/parse.go
new file mode 100644
index 00000000..d59b0927
--- /dev/null
+++ b/plugins/grpc/parser/parse.go
@@ -0,0 +1,114 @@
+package parser
+
+import (
+ "bytes"
+ "io"
+ "os"
+
+ pp "github.com/emicklei/proto"
+)
+
+// Service contains information about singular GRPC service.
+type Service struct {
+ // Package defines service namespace.
+ Package string
+
+ // Name defines service name.
+ Name string
+
+ // Methods list.
+ Methods []Method
+}
+
+// Method describes singular RPC method.
+type Method struct {
+ // Name is method name.
+ Name string
+
+ // StreamsRequest defines if method accept stream input.
+ StreamsRequest bool
+
+ // RequestType defines message name (from the same package) of method input.
+ RequestType string
+
+ // StreamsReturns defines if method streams result.
+ StreamsReturns bool
+
+ // ReturnsType defines message name (from the same package) of method return value.
+ ReturnsType string
+}
+
+// File parses given proto file or returns error.
+func File(file string, importPath string) ([]Service, error) {
+ reader, _ := os.Open(file)
+ defer reader.Close()
+
+ return parse(reader, importPath)
+}
+
+// Bytes parses string into proto definition.
+func Bytes(data []byte) ([]Service, error) {
+ return parse(bytes.NewBuffer(data), "")
+}
+
+func parse(reader io.Reader, importPath string) ([]Service, error) {
+ proto, err := pp.NewParser(reader).Parse()
+ if err != nil {
+ return nil, err
+ }
+
+ return parseServices(
+ proto,
+ parsePackage(proto),
+ importPath,
+ )
+}
+
+func parsePackage(proto *pp.Proto) string {
+ for _, e := range proto.Elements {
+ if p, ok := e.(*pp.Package); ok {
+ return p.Name
+ }
+ }
+
+ return ""
+}
+
+func parseServices(proto *pp.Proto, pkg string, importPath string) ([]Service, error) {
+ services := make([]Service, 0)
+
+ pp.Walk(proto, pp.WithService(func(service *pp.Service) {
+ services = append(services, Service{
+ Package: pkg,
+ Name: service.Name,
+ Methods: parseMethods(service),
+ })
+ }))
+
+ pp.Walk(proto, func(v pp.Visitee) {
+ if i, ok := v.(*pp.Import); ok {
+ if im, err := File(importPath+"/"+i.Filename, importPath); err == nil {
+ services = append(services, im...)
+ }
+ }
+ })
+
+ return services, nil
+}
+
+func parseMethods(s *pp.Service) []Method {
+ methods := make([]Method, 0)
+ for _, e := range s.Elements {
+ if m, ok := e.(*pp.RPC); ok {
+ methods = append(methods, Method{
+ Name: m.Name,
+ StreamsRequest: m.StreamsRequest,
+ RequestType: m.RequestType,
+ StreamsReturns: m.StreamsReturns,
+ ReturnsType: m.ReturnsType,
+ })
+ }
+ }
+
+ return methods
+}
diff --git a/plugins/grpc/parser/parse_test.go b/plugins/grpc/parser/parse_test.go
new file mode 100644
index 00000000..b71c133d
--- /dev/null
+++ b/plugins/grpc/parser/parse_test.go
@@ -0,0 +1,71 @@
+package parser
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestParseFile(t *testing.T) {
+ services, err := File("test.proto", "")
+ assert.NoError(t, err)
+ assert.Len(t, services, 2)
+
+ assert.Equal(t, "app.namespace", services[0].Package)
+}
+
+func TestParseFileWithImportsNestedFolder(t *testing.T) {
+ services, err := File("./test_nested/test_import.proto", "./test_nested")
+ assert.NoError(t, err)
+ assert.Len(t, services, 2)
+
+ assert.Equal(t, "app.namespace", services[0].Package)
+}
+
+func TestParseFileWithImports(t *testing.T) {
+ services, err := File("test_import.proto", ".")
+ assert.NoError(t, err)
+ assert.Len(t, services, 2)
+
+ assert.Equal(t, "app.namespace", services[0].Package)
+}
+
+func TestParseNotFound(t *testing.T) {
+ _, err := File("test2.proto", "")
+ assert.Error(t, err)
+}
+
+func TestParseBytes(t *testing.T) {
+ services, err := Bytes([]byte{})
+ assert.NoError(t, err)
+ assert.Len(t, services, 0)
+}
+
+func TestParseString(t *testing.T) {
+ services, err := Bytes([]byte(`
+syntax = "proto3";
+package app.namespace;
+
+// Ping Service.
+service PingService {
+ // Ping Method.
+ rpc Ping (Message) returns (Message) {
+ }
+}
+
+// Pong service.
+service PongService {
+ rpc Pong (stream Message) returns (stream Message) {
+ }
+}
+
+message Message {
+ string msg = 1;
+ int64 value = 2;
+}
+`))
+ assert.NoError(t, err)
+ assert.Len(t, services, 2)
+
+ assert.Equal(t, "app.namespace", services[0].Package)
+}
diff --git a/plugins/grpc/parser/pong.proto b/plugins/grpc/parser/pong.proto
new file mode 100644
index 00000000..9756fabe
--- /dev/null
+++ b/plugins/grpc/parser/pong.proto
@@ -0,0 +1,10 @@
+syntax = "proto3";
+package app.namespace;
+
+import "message.proto";
+
+// Pong service.
+service PongService {
+ rpc Pong (stream Message) returns (stream Message) {
+ }
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/test.proto b/plugins/grpc/parser/test.proto
new file mode 100644
index 00000000..e2230954
--- /dev/null
+++ b/plugins/grpc/parser/test.proto
@@ -0,0 +1,20 @@
+syntax = "proto3";
+package app.namespace;
+
+// Ping Service.
+service PingService {
+ // Ping Method.
+ rpc Ping (Message) returns (Message) {
+ }
+}
+
+// Pong service.
+service PongService {
+ rpc Pong (stream Message) returns (stream Message) {
+ }
+}
+
+message Message {
+ string msg = 1;
+ int64 value = 2;
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/test_import.proto b/plugins/grpc/parser/test_import.proto
new file mode 100644
index 00000000..1b954fc1
--- /dev/null
+++ b/plugins/grpc/parser/test_import.proto
@@ -0,0 +1,12 @@
+syntax = "proto3";
+package app.namespace;
+
+import "message.proto";
+import "pong.proto";
+
+// Ping Service.
+service PingService {
+ // Ping Method.
+ rpc Ping (Message) returns (Message) {
+ }
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/test_nested/message.proto b/plugins/grpc/parser/test_nested/message.proto
new file mode 100644
index 00000000..a4012010
--- /dev/null
+++ b/plugins/grpc/parser/test_nested/message.proto
@@ -0,0 +1,7 @@
+syntax = "proto3";
+package app.namespace;
+
+message Message {
+ string msg = 1;
+ int64 value = 2;
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/test_nested/pong.proto b/plugins/grpc/parser/test_nested/pong.proto
new file mode 100644
index 00000000..9756fabe
--- /dev/null
+++ b/plugins/grpc/parser/test_nested/pong.proto
@@ -0,0 +1,10 @@
+syntax = "proto3";
+package app.namespace;
+
+import "message.proto";
+
+// Pong service.
+service PongService {
+ rpc Pong (stream Message) returns (stream Message) {
+ }
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/test_nested/test_import.proto b/plugins/grpc/parser/test_nested/test_import.proto
new file mode 100644
index 00000000..a3a476ba
--- /dev/null
+++ b/plugins/grpc/parser/test_nested/test_import.proto
@@ -0,0 +1,12 @@
+syntax = "proto3";
+package app.namespace;
+
+import "message.proto";
+import "pong.proto";
+
+// Ping Service.
+service PingService {
+ // Ping Method.
+ rpc Ping (Message) returns (Message) {
+ }
+}
diff --git a/plugins/grpc/plugin.go b/plugins/grpc/plugin.go
new file mode 100644
index 00000000..7518d352
--- /dev/null
+++ b/plugins/grpc/plugin.go
@@ -0,0 +1,195 @@
+package grpc
+
+import (
+ "context"
+ "sync"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/grpc/codec"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/encoding"
+)
+
+const (
+ name string = "grpc"
+ RrGrpc string = "RR_GRPC"
+)
+
+type Plugin struct {
+ mu *sync.RWMutex
+ config *Config
+ gPool pool.Pool
+ opts []grpc.ServerOption
+ services []func(server *grpc.Server)
+ server *grpc.Server
+ rrServer server.Server
+
+ // events handler
+ events events.Handler
+ log logger.Logger
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+ const op = errors.Op("grpc_plugin_init")
+
+ if !cfg.Has(name) {
+ return errors.E(errors.Disabled)
+ }
+ // register the codec
+ encoding.RegisterCodec(&codec.Codec{})
+
+ err := cfg.UnmarshalKey(name, &p.config)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = p.config.InitDefaults()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.opts = make([]grpc.ServerOption, 0)
+ p.services = make([]func(server *grpc.Server), 0)
+ p.events = events.NewEventsHandler()
+ p.events.AddListener(p.collectGRPCEvents)
+ p.rrServer = server
+
+ // worker's GRPC mode
+ if p.config.Env == nil {
+ p.config.Env = make(map[string]string)
+ }
+ p.config.Env[RrGrpc] = "true"
+
+ p.log = log
+ p.mu = &sync.RWMutex{}
+
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("grpc_plugin_serve")
+ errCh := make(chan error, 1)
+
+ var err error
+ p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{
+ Debug: p.config.GrpcPool.Debug,
+ NumWorkers: p.config.GrpcPool.NumWorkers,
+ MaxJobs: p.config.GrpcPool.MaxJobs,
+ AllocateTimeout: p.config.GrpcPool.AllocateTimeout,
+ DestroyTimeout: p.config.GrpcPool.DestroyTimeout,
+ Supervisor: p.config.GrpcPool.Supervisor,
+ }, p.config.Env, p.collectGRPCEvents)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ go func() {
+ var err error
+ p.mu.Lock()
+ p.server, err = p.createGRPCserver()
+ if err != nil {
+ p.log.Error("create grpc server", "error", err)
+ errCh <- errors.E(op, err)
+ return
+ }
+
+ l, err := utils.CreateListener(p.config.Listen)
+ if err != nil {
+ p.log.Error("create grpc listener", "error", err)
+ errCh <- errors.E(op, err)
+ }
+
+ // protect serve
+ p.mu.Unlock()
+ err = p.server.Serve(l)
+ if err != nil {
+ // skip errors when stopping the server
+ if err == grpc.ErrServerStopped {
+ return
+ }
+
+ p.log.Error("grpc server stopped", "error", err)
+ errCh <- errors.E(op, err)
+ return
+ }
+ }()
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.server != nil {
+ p.server.Stop()
+ }
+ return nil
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Name() string {
+ return name
+}
+
+func (p *Plugin) Reset() error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ const op = errors.Op("grpc_plugin_reset")
+
+ // destroy old pool
+ p.gPool.Destroy(context.Background())
+
+ var err error
+ p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{
+ Debug: p.config.GrpcPool.Debug,
+ NumWorkers: p.config.GrpcPool.NumWorkers,
+ MaxJobs: p.config.GrpcPool.MaxJobs,
+ AllocateTimeout: p.config.GrpcPool.AllocateTimeout,
+ DestroyTimeout: p.config.GrpcPool.DestroyTimeout,
+ Supervisor: p.config.GrpcPool.Supervisor,
+ }, p.config.Env, p.collectGRPCEvents)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (p *Plugin) Workers() []*process.State {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ workers := p.gPool.Workers()
+
+ ps := make([]*process.State, 0, len(workers))
+ for i := 0; i < len(workers); i++ {
+ state, err := process.WorkerProcessState(workers[i])
+ if err != nil {
+ return nil
+ }
+ ps = append(ps, state)
+ }
+
+ return ps
+}
+
+func (p *Plugin) collectGRPCEvents(event interface{}) {
+ if gev, ok := event.(events.GRPCEvent); ok {
+ switch gev.Event {
+ case events.EventUnaryCallOk:
+ p.log.Info("method called", "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed)
+ case events.EventUnaryCallErr:
+ p.log.Info("method call finished with error", "error", gev.Error, "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed)
+ }
+ }
+}
diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go
new file mode 100644
index 00000000..0894a7a8
--- /dev/null
+++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go
@@ -0,0 +1,68 @@
+// MIT License
+//
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package main
+
+import (
+ "io"
+ "io/ioutil"
+ "os"
+
+ "github.com/spiral/roadrunner/v2/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php"
+ "google.golang.org/protobuf/proto"
+ plugin "google.golang.org/protobuf/types/pluginpb"
+)
+
+func main() {
+ req, err := readRequest(os.Stdin)
+ if err != nil {
+ panic(err)
+ }
+
+ if err = writeResponse(os.Stdout, php.Generate(req)); err != nil {
+ panic(err)
+ }
+}
+
+func readRequest(in io.Reader) (*plugin.CodeGeneratorRequest, error) {
+ data, err := ioutil.ReadAll(in)
+ if err != nil {
+ return nil, err
+ }
+
+ req := new(plugin.CodeGeneratorRequest)
+ if err = proto.Unmarshal(data, req); err != nil {
+ return nil, err
+ }
+
+ return req, nil
+}
+
+func writeResponse(out io.Writer, resp *plugin.CodeGeneratorResponse) error {
+ data, err := proto.Marshal(resp)
+ if err != nil {
+ return err
+ }
+
+ _, err = out.Write(data)
+ return err
+}
diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go
new file mode 100644
index 00000000..03c48ac8
--- /dev/null
+++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go
@@ -0,0 +1,57 @@
+// MIT License
+//
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package php
+
+import (
+ desc "google.golang.org/protobuf/types/descriptorpb"
+ plugin "google.golang.org/protobuf/types/pluginpb"
+)
+
+// Generate generates needed service classes
+func Generate(req *plugin.CodeGeneratorRequest) *plugin.CodeGeneratorResponse {
+ resp := &plugin.CodeGeneratorResponse{}
+
+ for _, file := range req.ProtoFile {
+ for _, service := range file.Service {
+ resp.File = append(resp.File, generate(req, file, service))
+ }
+ }
+
+ return resp
+}
+
+func generate(
+ req *plugin.CodeGeneratorRequest,
+ file *desc.FileDescriptorProto,
+ service *desc.ServiceDescriptorProto,
+) *plugin.CodeGeneratorResponse_File {
+ return &plugin.CodeGeneratorResponse_File{
+ Name: str(filename(file, service.Name)),
+ Content: str(body(req, file, service)),
+ }
+}
+
+// helper to convert string into string pointer
+func str(str string) *string {
+ return &str
+}
diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go
new file mode 100644
index 00000000..32579e33
--- /dev/null
+++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go
@@ -0,0 +1,139 @@
+// MIT License
+//
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package php
+
+import (
+ "bytes"
+ "strings"
+ "unicode"
+)
+
+// @see https://github.com/protocolbuffers/protobuf/blob/master/php/ext/google/protobuf/protobuf.c#L168
+var reservedKeywords = []string{
+ "abstract", "and", "array", "as", "break",
+ "callable", "case", "catch", "class", "clone",
+ "const", "continue", "declare", "default", "die",
+ "do", "echo", "else", "elseif", "empty",
+ "enddeclare", "endfor", "endforeach", "endif", "endswitch",
+ "endwhile", "eval", "exit", "extends", "final",
+ "for", "foreach", "function", "global", "goto",
+ "if", "implements", "include", "include_once", "instanceof",
+ "insteadof", "interface", "isset", "list", "namespace",
+ "new", "or", "print", "private", "protected",
+ "public", "require", "require_once", "return", "static",
+ "switch", "throw", "trait", "try", "unset",
+ "use", "var", "while", "xor", "int",
+ "float", "bool", "string", "true", "false",
+ "null", "void", "iterable",
+}
+
+// Check if given name/keyword is reserved by php.
+func isReserved(name string) bool {
+ name = strings.ToLower(name)
+ for _, k := range reservedKeywords {
+ if name == k {
+ return true
+ }
+ }
+
+ return false
+}
+
+// generate php namespace or path
+func namespace(pkg *string, sep string) string {
+ if pkg == nil {
+ return ""
+ }
+
+ result := bytes.NewBuffer(nil)
+ for _, p := range strings.Split(*pkg, ".") {
+ result.WriteString(identifier(p, ""))
+ result.WriteString(sep)
+ }
+
+ return strings.Trim(result.String(), sep)
+}
+
+// create php identifier for class or message
+func identifier(name string, suffix string) string {
+ name = Camelize(name)
+ if suffix != "" {
+ return name + Camelize(suffix)
+ }
+
+ return name
+}
+
+func resolveReserved(identifier string, pkg string) string {
+ if isReserved(strings.ToLower(identifier)) {
+ if pkg == ".google.protobuf" {
+ return "GPB" + identifier
+ }
+ return "PB" + identifier
+ }
+
+ return identifier
+}
+
+// Camelize "dino_party" -> "DinoParty"
+func Camelize(word string) string {
+ words := splitAtCaseChangeWithTitlecase(word)
+ return strings.Join(words, "")
+}
+
+func splitAtCaseChangeWithTitlecase(s string) []string {
+ words := make([]string, 0)
+ word := make([]rune, 0)
+ for _, c := range s {
+ spacer := isSpacerChar(c)
+ if len(word) > 0 {
+ if unicode.IsUpper(c) || spacer {
+ words = append(words, string(word))
+ word = make([]rune, 0)
+ }
+ }
+ if !spacer {
+ if len(word) > 0 {
+ word = append(word, unicode.ToLower(c))
+ } else {
+ word = append(word, unicode.ToUpper(c))
+ }
+ }
+ }
+ words = append(words, string(word))
+ return words
+}
+
+func isSpacerChar(c rune) bool {
+ switch {
+ case c == rune("_"[0]):
+ return true
+ case c == rune(" "[0]):
+ return true
+ case c == rune(":"[0]):
+ return true
+ case c == rune("-"[0]):
+ return true
+ }
+ return false
+}
diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go
new file mode 100644
index 00000000..c1dc3898
--- /dev/null
+++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go
@@ -0,0 +1,103 @@
+package php
+
+import (
+ "bytes"
+ "fmt"
+ "strings"
+
+ desc "google.golang.org/protobuf/types/descriptorpb"
+ plugin "google.golang.org/protobuf/types/pluginpb"
+)
+
+// manages internal name representation of the package
+type ns struct {
+ // Package defines file package.
+ Package string
+
+ // Root namespace of the package
+ Namespace string
+
+ // Import declares what namespaces to be imported
+ Import map[string]string
+}
+
+// newNamespace creates new work namespace.
+func newNamespace(req *plugin.CodeGeneratorRequest, file *desc.FileDescriptorProto, service *desc.ServiceDescriptorProto) *ns {
+ ns := &ns{
+ Package: *file.Package,
+ Namespace: namespace(file.Package, "\\"),
+ Import: make(map[string]string),
+ }
+
+ if file.Options != nil && file.Options.PhpNamespace != nil {
+ ns.Namespace = *file.Options.PhpNamespace
+ }
+
+ for k := range service.Method {
+ ns.importMessage(req, service.Method[k].InputType)
+ ns.importMessage(req, service.Method[k].OutputType)
+ }
+
+ return ns
+}
+
+// importMessage registers new import message namespace (only the namespace).
+func (ns *ns) importMessage(req *plugin.CodeGeneratorRequest, msg *string) {
+ if msg == nil {
+ return
+ }
+
+ chunks := strings.Split(*msg, ".")
+ pkg := strings.Join(chunks[:len(chunks)-1], ".")
+
+ result := bytes.NewBuffer(nil)
+ for _, p := range chunks[:len(chunks)-1] {
+ result.WriteString(identifier(p, ""))
+ result.WriteString(`\`)
+ }
+
+ if pkg == "."+ns.Package {
+ // root package
+ return
+ }
+
+ for _, f := range req.ProtoFile {
+ if pkg == "."+*f.Package {
+ if f.Options != nil && f.Options.PhpNamespace != nil {
+ // custom imported namespace
+ ns.Import[pkg] = *f.Options.PhpNamespace
+ return
+ }
+ }
+ }
+
+ ns.Import[pkg] = strings.Trim(result.String(), `\`)
+}
+
+// resolve message alias
+func (ns *ns) resolve(msg *string) string {
+ chunks := strings.Split(*msg, ".")
+ pkg := strings.Join(chunks[:len(chunks)-1], ".")
+
+ if pkg == "."+ns.Package {
+ // root message
+ return identifier(chunks[len(chunks)-1], "")
+ }
+
+ for iPkg, ns := range ns.Import {
+ if pkg == iPkg {
+ // use last namespace chunk
+ nsChunks := strings.Split(ns, `\`)
+ identifier := identifier(chunks[len(chunks)-1], "")
+
+ return fmt.Sprintf(
+ `%s\%s`,
+ nsChunks[len(nsChunks)-1],
+ resolveReserved(identifier, pkg),
+ )
+ }
+ }
+
+ // fully clarified name (fallback)
+ return "\\" + namespace(msg, "\\")
+}
diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go
new file mode 100644
index 00000000..e00c6fdd
--- /dev/null
+++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go
@@ -0,0 +1,103 @@
+// MIT License
+//
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package php
+
+import (
+ "bytes"
+ "fmt"
+ "strings"
+ "text/template"
+
+ desc "google.golang.org/protobuf/types/descriptorpb"
+ plugin "google.golang.org/protobuf/types/pluginpb"
+)
+
+const phpBody = `<?php
+# Generated by the protocol buffer compiler (spiral/php-grpc). DO NOT EDIT!
+# source: {{ .File.Name }}
+{{ $ns := .Namespace -}}
+{{if $ns.Namespace}}
+namespace {{ $ns.Namespace }};
+{{end}}
+use Spiral\GRPC;
+{{- range $n := $ns.Import}}
+use {{ $n }};
+{{- end}}
+
+interface {{ .Service.Name | interface }} extends GRPC\ServiceInterface
+{
+ // GRPC specific service name.
+ public const NAME = "{{ .File.Package }}.{{ .Service.Name }}";{{ "\n" }}
+{{- range $m := .Service.Method}}
+ /**
+ * @param GRPC\ContextInterface $ctx
+ * @param {{ name $ns $m.InputType }} $in
+ * @return {{ name $ns $m.OutputType }}
+ *
+ * @throws GRPC\Exception\InvokeException
+ */
+ public function {{ $m.Name }}(GRPC\ContextInterface $ctx, {{ name $ns $m.InputType }} $in): {{ name $ns $m.OutputType }};
+{{end -}}
+}
+`
+
+// generate php filename
+func filename(file *desc.FileDescriptorProto, name *string) string {
+ ns := namespace(file.Package, "/")
+ if file.Options != nil && file.Options.PhpNamespace != nil {
+ ns = strings.ReplaceAll(*file.Options.PhpNamespace, `\`, `/`)
+ }
+
+ return fmt.Sprintf("%s/%s.php", ns, identifier(*name, "interface"))
+}
+
+// generate php file body
+func body(req *plugin.CodeGeneratorRequest, file *desc.FileDescriptorProto, service *desc.ServiceDescriptorProto) string {
+ out := bytes.NewBuffer(nil)
+
+ data := struct {
+ Namespace *ns
+ File *desc.FileDescriptorProto
+ Service *desc.ServiceDescriptorProto
+ }{
+ Namespace: newNamespace(req, file, service),
+ File: file,
+ Service: service,
+ }
+
+ tpl := template.Must(template.New("phpBody").Funcs(template.FuncMap{
+ "interface": func(name *string) string {
+ return identifier(*name, "interface")
+ },
+ "name": func(ns *ns, name *string) string {
+ return ns.resolve(name)
+ },
+ }).Parse(phpBody))
+
+ err := tpl.Execute(out, data)
+ if err != nil {
+ panic(err)
+ }
+
+ return out.String()
+}
diff --git a/plugins/grpc/proxy/proxy.go b/plugins/grpc/proxy/proxy.go
new file mode 100644
index 00000000..074aac85
--- /dev/null
+++ b/plugins/grpc/proxy/proxy.go
@@ -0,0 +1,219 @@
+package proxy
+
+import (
+ "encoding/json"
+ "fmt"
+ "strconv"
+ "strings"
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/grpc/codec"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/peer"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/anypb"
+)
+
+const (
+ peerAddr string = ":peer.address"
+ peerAuthType string = ":peer.auth-type"
+ delimiter string = "|:|"
+)
+
+// base interface for Proxy class
+type proxyService interface {
+ // RegisterMethod registers new RPC method.
+ RegisterMethod(method string)
+
+ // ServiceDesc returns service description for the proxy.
+ ServiceDesc() *grpc.ServiceDesc
+}
+
+// carry details about service, method and RPC context to PHP process
+type rpcContext struct {
+ Service string `json:"service"`
+ Method string `json:"method"`
+ Context map[string][]string `json:"context"`
+}
+
+// Proxy manages GRPC/RoadRunner bridge.
+type Proxy struct {
+ mu *sync.RWMutex
+ grpcPool pool.Pool
+ name string
+ metadata string
+ methods []string
+}
+
+// NewProxy creates new service proxy object.
+func NewProxy(name string, metadata string, grpcPool pool.Pool, mu *sync.RWMutex) *Proxy {
+ return &Proxy{
+ mu: mu,
+ grpcPool: grpcPool,
+ name: name,
+ metadata: metadata,
+ methods: make([]string, 0),
+ }
+}
+
+// RegisterMethod registers new RPC method.
+func (p *Proxy) RegisterMethod(method string) {
+ p.methods = append(p.methods, method)
+}
+
+// ServiceDesc returns service description for the proxy.
+func (p *Proxy) ServiceDesc() *grpc.ServiceDesc {
+ desc := &grpc.ServiceDesc{
+ ServiceName: p.name,
+ Metadata: p.metadata,
+ HandlerType: (*proxyService)(nil),
+ Methods: []grpc.MethodDesc{},
+ Streams: []grpc.StreamDesc{},
+ }
+
+ // Registering methods
+ for _, m := range p.methods {
+ desc.Methods = append(desc.Methods, grpc.MethodDesc{
+ MethodName: m,
+ Handler: p.methodHandler(m),
+ })
+ }
+
+ return desc
+}
+
+// Generate method handler proxy.
+func (p *Proxy) methodHandler(method string) func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ return func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := codec.RawMessage{}
+ if err := dec(&in); err != nil {
+ return nil, wrapError(err)
+ }
+
+ if interceptor == nil {
+ return p.invoke(ctx, method, in)
+ }
+
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: fmt.Sprintf("/%s/%s", p.name, method),
+ }
+
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return p.invoke(ctx, method, req.(codec.RawMessage))
+ }
+
+ return interceptor(ctx, in, info, handler)
+ }
+}
+
+func (p *Proxy) invoke(ctx context.Context, method string, in codec.RawMessage) (interface{}, error) {
+ payload, err := p.makePayload(ctx, method, in)
+ if err != nil {
+ return nil, err
+ }
+
+ p.mu.RLock()
+ resp, err := p.grpcPool.Exec(payload)
+ p.mu.RUnlock()
+
+ if err != nil {
+ return nil, wrapError(err)
+ }
+
+ md, err := p.responseMetadata(resp)
+ if err != nil {
+ return nil, err
+ }
+ ctx = metadata.NewIncomingContext(ctx, md)
+ err = grpc.SetHeader(ctx, md)
+ if err != nil {
+ return nil, err
+ }
+
+ return codec.RawMessage(resp.Body), nil
+}
+
+// responseMetadata extracts metadata from roadrunner response Payload.Context and converts it to metadata.MD
+func (p *Proxy) responseMetadata(resp *payload.Payload) (metadata.MD, error) {
+ var md metadata.MD
+ if resp == nil || len(resp.Context) == 0 {
+ return md, nil
+ }
+
+ var rpcMetadata map[string]string
+ err := json.Unmarshal(resp.Context, &rpcMetadata)
+ if err != nil {
+ return md, err
+ }
+
+ if len(rpcMetadata) > 0 {
+ md = metadata.New(rpcMetadata)
+ }
+
+ return md, nil
+}
+
+// makePayload generates RoadRunner compatible payload based on GRPC message. todo: return error
+func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMessage) (*payload.Payload, error) {
+ ctxMD := make(map[string][]string)
+
+ if md, ok := metadata.FromIncomingContext(ctx); ok {
+ for k, v := range md {
+ ctxMD[k] = v
+ }
+ }
+
+ if pr, ok := peer.FromContext(ctx); ok {
+ ctxMD[peerAddr] = []string{pr.Addr.String()}
+ if pr.AuthInfo != nil {
+ ctxMD[peerAuthType] = []string{pr.AuthInfo.AuthType()}
+ }
+ }
+
+ ctxData, err := json.Marshal(rpcContext{Service: p.name, Method: method, Context: ctxMD})
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &payload.Payload{Context: ctxData, Body: body}, nil
+}
+
+// mounts proper error code for the error
+func wrapError(err error) error {
+ // internal agreement
+ if strings.Contains(err.Error(), delimiter) {
+ chunks := strings.Split(err.Error(), delimiter)
+ code := codes.Internal
+
+ // protect the slice access
+ if len(chunks) < 2 {
+ return err
+ }
+
+ if phpCode, errConv := strconv.ParseUint(chunks[0], 10, 32); errConv == nil {
+ code = codes.Code(phpCode)
+ }
+
+ st := status.New(code, chunks[1]).Proto()
+
+ for _, detailsMessage := range chunks[2:] {
+ anyDetailsMessage := anypb.Any{}
+ errP := proto.Unmarshal([]byte(detailsMessage), &anyDetailsMessage)
+ if errP == nil {
+ st.Details = append(st.Details, &anyDetailsMessage)
+ }
+ }
+
+ return status.ErrorProto(st)
+ }
+
+ return status.Error(codes.Internal, err.Error())
+}
diff --git a/plugins/grpc/proxy/proxy_test.go b/plugins/grpc/proxy/proxy_test.go
new file mode 100644
index 00000000..2c024ee3
--- /dev/null
+++ b/plugins/grpc/proxy/proxy_test.go
@@ -0,0 +1,134 @@
+package proxy
+
+// import (
+// "testing"
+// "time"
+
+// "github.com/sirupsen/logrus"
+// "github.com/sirupsen/logrus/hooks/test"
+// "github.com/stretchr/testify/assert"
+// "golang.org/x/net/context"
+// "google.golang.org/grpc"
+// "google.golang.org/grpc/codes"
+// "google.golang.org/grpc/metadata"
+// "google.golang.org/grpc/status"
+// )
+
+// const addr = "localhost:9080"
+
+// func Test_Proxy_Error(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+
+// assert.NoError(t, c.Init(&testCfg{
+// grpcCfg: `{
+// "listen": "tcp://:9080",
+// "tls": {
+// "key": "tests/server.key",
+// "cert": "tests/server.crt"
+// },
+// "proto": "tests/test.proto",
+// "workers":{
+// "command": "php tests/worker.php",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10,
+// "destroyTimeout": 10
+// }
+// }
+// }`,
+// }))
+
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+
+// // should do nothing
+// s.(*Service).Stop()
+
+// go func() { assert.NoError(t, c.Serve()) }()
+// time.Sleep(time.Millisecond * 100)
+// defer c.Stop()
+
+// cl, cn := getClient(addr)
+// defer cn.Close()
+
+// _, err := cl.Throw(context.Background(), &tests.Message{Msg: "notFound"})
+
+// assert.Error(t, err)
+// se, _ := status.FromError(err)
+// assert.Equal(t, "nothing here", se.Message())
+// assert.Equal(t, codes.NotFound, se.Code())
+
+// _, errWithDetails := cl.Throw(context.Background(), &tests.Message{Msg: "withDetails"})
+
+// assert.Error(t, errWithDetails)
+// statusWithDetails, _ := status.FromError(errWithDetails)
+// assert.Equal(t, "main exception message", statusWithDetails.Message())
+// assert.Equal(t, codes.InvalidArgument, statusWithDetails.Code())
+
+// details := statusWithDetails.Details()
+
+// detailsMessageForException := details[0].(*tests.DetailsMessageForException)
+
+// assert.Equal(t, detailsMessageForException.Code, uint64(1))
+// assert.Equal(t, detailsMessageForException.Message, "details message")
+// }
+
+// func Test_Proxy_Metadata(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+
+// assert.NoError(t, c.Init(&testCfg{
+// grpcCfg: `{
+// "listen": "tcp://:9080",
+// "tls": {
+// "key": "tests/server.key",
+// "cert": "tests/server.crt"
+// },
+// "proto": "tests/test.proto",
+// "workers":{
+// "command": "php tests/worker.php",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10,
+// "destroyTimeout": 10
+// }
+// }
+// }`,
+// }))
+
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+
+// // should do nothing
+// s.(*Service).Stop()
+
+// go func() { assert.NoError(t, c.Serve()) }()
+// time.Sleep(time.Millisecond * 100)
+// defer c.Stop()
+
+// cl, cn := getClient(addr)
+// defer cn.Close()
+
+// ctx := metadata.AppendToOutgoingContext(context.Background(), "key", "proxy-value")
+// var header metadata.MD
+// out, err := cl.Info(
+// ctx,
+// &tests.Message{Msg: "MD"},
+// grpc.Header(&header),
+// grpc.WaitForReady(true),
+// )
+// assert.Equal(t, []string{"bar"}, header.Get("foo"))
+// assert.NoError(t, err)
+// assert.Equal(t, `["proxy-value"]`, out.Msg)
+// }
diff --git a/plugins/grpc/server.go b/plugins/grpc/server.go
new file mode 100644
index 00000000..323f73a0
--- /dev/null
+++ b/plugins/grpc/server.go
@@ -0,0 +1,154 @@
+package grpc
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "os"
+ "path"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/plugins/grpc/parser"
+ "github.com/spiral/roadrunner/v2/plugins/grpc/proxy"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/keepalive"
+)
+
+func (p *Plugin) createGRPCserver() (*grpc.Server, error) {
+ const op = errors.Op("grpc_plugin_create_server")
+ opts, err := p.serverOptions()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ server := grpc.NewServer(opts...)
+
+ if p.config.Proto != "" {
+ // php proxy services
+ services, err := parser.File(p.config.Proto, path.Dir(p.config.Proto))
+ if err != nil {
+ return nil, err
+ }
+
+ for _, service := range services {
+ p := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto, p.gPool, p.mu)
+ for _, m := range service.Methods {
+ p.RegisterMethod(m.Name)
+ }
+
+ server.RegisterService(p.ServiceDesc(), p)
+ }
+ }
+
+ // external and native services
+ for _, r := range p.services {
+ r(server)
+ }
+
+ return server, nil
+}
+
+func (p *Plugin) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ start := time.Now()
+ resp, err := handler(ctx, req)
+ if err != nil {
+ p.events.Push(events.GRPCEvent{
+ Event: events.EventUnaryCallErr,
+ Info: info,
+ Error: err,
+ Start: start,
+ Elapsed: time.Since(start),
+ })
+
+ return nil, err
+ }
+
+ p.events.Push(events.GRPCEvent{
+ Event: events.EventUnaryCallOk,
+ Info: info,
+ Start: start,
+ Elapsed: time.Since(start),
+ })
+
+ return resp, nil
+}
+
+func (p *Plugin) serverOptions() ([]grpc.ServerOption, error) {
+ const op = errors.Op("grpc_plugin_server_options")
+
+ var tcreds credentials.TransportCredentials
+ var opts []grpc.ServerOption
+ var cert tls.Certificate
+ var certPool *x509.CertPool
+ var rca []byte
+ var err error
+
+ if p.config.EnableTLS() {
+ // if client CA is not empty we combine it with Cert and Key
+ if p.config.TLS.RootCA != "" {
+ cert, err = tls.LoadX509KeyPair(p.config.TLS.Cert, p.config.TLS.Key)
+ if err != nil {
+ return nil, err
+ }
+
+ certPool, err = x509.SystemCertPool()
+ if err != nil {
+ return nil, err
+ }
+ if certPool == nil {
+ certPool = x509.NewCertPool()
+ }
+
+ rca, err = os.ReadFile(p.config.TLS.RootCA)
+ if err != nil {
+ return nil, err
+ }
+
+ if ok := certPool.AppendCertsFromPEM(rca); !ok {
+ return nil, errors.E(op, errors.Str("could not append Certs from PEM"))
+ }
+
+ tcreds = credentials.NewTLS(&tls.Config{
+ MinVersion: tls.VersionTLS12,
+ ClientAuth: tls.RequireAndVerifyClientCert,
+ Certificates: []tls.Certificate{cert},
+ ClientCAs: certPool,
+ })
+ } else {
+ tcreds, err = credentials.NewServerTLSFromFile(p.config.TLS.Cert, p.config.TLS.Key)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ serverOptions := []grpc.ServerOption{
+ grpc.MaxSendMsgSize(int(p.config.MaxSendMsgSize)),
+ grpc.MaxRecvMsgSize(int(p.config.MaxRecvMsgSize)),
+ grpc.KeepaliveParams(keepalive.ServerParameters{
+ MaxConnectionIdle: p.config.MaxConnectionIdle,
+ MaxConnectionAge: p.config.MaxConnectionAge,
+ MaxConnectionAgeGrace: p.config.MaxConnectionAge,
+ Time: p.config.PingTime,
+ Timeout: p.config.Timeout,
+ }),
+ grpc.MaxConcurrentStreams(uint32(p.config.MaxConcurrentStreams)),
+ }
+
+ opts = append(opts, grpc.Creds(tcreds))
+ opts = append(opts, serverOptions...)
+ }
+
+ opts = append(opts, p.opts...)
+
+ // custom codec is required to bypass protobuf, common interceptor used for debug and stats
+ return append(
+ opts,
+ grpc.UnaryInterceptor(p.interceptor),
+ // TODO(rustatian): check deprecation
+ // grpc.CustomCodec(&codec{encoding.GetCodec(encCodec)}),
+ ), nil
+}