diff options
-rw-r--r-- | .vscode/settings.json | 2 | ||||
-rw-r--r-- | Makefile | 15 | ||||
-rw-r--r-- | go.mod | 8 | ||||
-rw-r--r-- | go.sum | 14 | ||||
-rw-r--r-- | plugins/grpc/codec/codec.go (renamed from plugins/grpc/codec.go) | 24 | ||||
-rw-r--r-- | plugins/grpc/codec/codec_test.go (renamed from plugins/grpc/codec_test.go) | 18 | ||||
-rw-r--r-- | plugins/grpc/config.go | 2 | ||||
-rw-r--r-- | plugins/grpc/parser/message.proto | 7 | ||||
-rw-r--r-- | plugins/grpc/parser/parse.go | 114 | ||||
-rw-r--r-- | plugins/grpc/parser/parse_test.go | 71 | ||||
-rw-r--r-- | plugins/grpc/parser/pong.proto | 10 | ||||
-rw-r--r-- | plugins/grpc/parser/test.proto | 20 | ||||
-rw-r--r-- | plugins/grpc/parser/test_import.proto | 12 | ||||
-rw-r--r-- | plugins/grpc/parser/test_nested/message.proto | 7 | ||||
-rw-r--r-- | plugins/grpc/parser/test_nested/pong.proto | 10 | ||||
-rw-r--r-- | plugins/grpc/parser/test_nested/test_import.proto | 12 | ||||
-rw-r--r-- | plugins/grpc/plugin.go | 10 | ||||
-rw-r--r-- | plugins/grpc/proxy/proxy.go | 203 | ||||
-rw-r--r-- | plugins/grpc/proxy/proxy_test.go | 134 | ||||
-rw-r--r-- | plugins/grpc/server.go | 34 |
20 files changed, 676 insertions, 51 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json index 44b782ad..50bc942d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,8 +3,10 @@ "addrs", "amqp", "amqpjobs", + "anypb", "boltdb", "codecov", + "Conv", "golangci", "gomemcache", "goridge", @@ -17,11 +17,15 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.out -covermode=atomic ./plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.out -covermode=atomic ./plugins/http/config + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_codec.out -covermode=atomic ./plugins/grpc/codec + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_parser.out -covermode=atomic ./plugins/grpc/parser + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_proxy.out -covermode=atomic ./plugins/grpc/proxy go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.out -covermode=atomic ./plugins/server go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.out -covermode=atomic ./plugins/jobs/job go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.out -covermode=atomic ./tests/plugins/jobs go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.out -covermode=atomic ./tests/plugins/kv + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_plugin.out -covermode=atomic ./tests/plugins/grpc go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.out -covermode=atomic ./tests/plugins/broadcast go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.out -covermode=atomic ./tests/plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.out -covermode=atomic ./tests/plugins/http @@ -54,14 +58,17 @@ test: ## Run application tests go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./plugins/server go test -v -race -tags=debug ./plugins/jobs/job - go test -v -race -tags=debug ./tests/plugins/grpc + go test -v -race -tags=debug ./plugins/websockets + go test -v -race -tags=debug ./plugins/grpc + go test -v -race -tags=debug ./plugins/grpc/codec + go test -v -race -tags=debug ./plugins/grpc/parser + go test -v -race -tags=debug ./plugins/grpc/proxy + go test -v -race -tags=debug ./tests/plugins/jobs go test -v -race -tags=debug ./tests/plugins/kv go test -v -race -tags=debug ./tests/plugins/broadcast go test -v -race -tags=debug ./tests/plugins/websockets - go test -v -race -tags=debug ./tests/plugins/jobs - go test -v -race -tags=debug ./plugins/websockets - go test -v -race -tags=debug ./plugins/grpc go test -v -race -tags=debug ./tests/plugins/http + go test -v -race -tags=debug ./tests/plugins/grpc go test -v -race -tags=debug ./tests/plugins/informer go test -v -race -tags=debug ./tests/plugins/reload go test -v -race -tags=debug ./tests/plugins/server @@ -14,7 +14,7 @@ require ( github.com/beanstalkd/go-beanstalk v0.1.0 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cenkalti/backoff/v4 v4.1.1 - github.com/fasthttp/websocket v1.4.3 + github.com/fasthttp/websocket v1.4.3-rc.8 github.com/fatih/color v1.12.0 github.com/go-redis/redis/v8 v8.11.3 github.com/gofiber/fiber/v2 v2.18.0 @@ -57,9 +57,9 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/emicklei/proto v1.9.1 github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-ole/go-ole v1.2.5 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mattn/go-colorable v0.1.8 // indirect @@ -73,7 +73,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.30.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect - github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873 // indirect + github.com/savsgio/gotils v0.0.0-20210907153846-c06938798b52 // indirect github.com/spf13/afero v1.6.0 // indirect github.com/spf13/cast v1.4.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect @@ -93,3 +93,5 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) + +require github.com/golang/protobuf v1.5.2 // indirect @@ -50,7 +50,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.2/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0UnM= github.com/andybalholm/brotli v1.0.3/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= @@ -113,6 +112,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/emicklei/proto v1.9.1 h1:MUgjFo5xlMwYv72TnF5xmmdKZ04u+dVbv6wdARv16D8= +github.com/emicklei/proto v1.9.1/go.mod h1:rn1FgRS/FANiZdD2djyH7TMA9jdRDcYQ9IEN9yvjX0A= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -121,8 +122,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/fasthttp/websocket v1.4.3 h1:qjhRJ/rTy4KB8oBxljEC00SDt6HUY9jLRfM601SUdS4= -github.com/fasthttp/websocket v1.4.3/go.mod h1:5r4oKssgS7W6Zn6mPWap3NWzNPJNzUUh3baWTOhcYQk= +github.com/fasthttp/websocket v1.4.3-rc.8 h1:6P/+ejKdkLC9UhkY7GlShGWYMDBiWQtIECLBTDZ/2LU= +github.com/fasthttp/websocket v1.4.3-rc.8/go.mod h1:4m/MeZnTBQR2coy0HDUpyBXDkgtl2SxO+GZng0EKr6k= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= @@ -265,7 +266,6 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4= github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= @@ -356,9 +356,9 @@ github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891/go.mod h1:ogQD github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= -github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVzGOXq8RbEvHnhzA/A2sLZzgn0m6URjnukY8= -github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873 h1:N3Af8f13ooDKcIhsmFT7Z05CStZWu4C7Md0uDEy4q6o= github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873/go.mod h1:dmPawKuiAeG/aFYVs2i+Dyosoo7FNcm+Pi8iK6ZUrX8= +github.com/savsgio/gotils v0.0.0-20210907153846-c06938798b52 h1:FODZE/jDkENIpW3JiMA9sXBQfNklTfClUNhR9k37dPY= +github.com/savsgio/gotils v0.0.0-20210907153846-c06938798b52/go.mod h1:oejLrk1Y/5zOF+c/aHtXqn3TFlzzbAgPWg8zBiAHDas= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shirou/gopsutil v3.21.8+incompatible h1:sh0foI8tMRlCidUJR+KzqWYWxrkuuPIGiO6Vp+KXdCU= github.com/shirou/gopsutil v3.21.8+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= @@ -405,10 +405,8 @@ github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2bi github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.14.0/go.mod h1:ol1PCaL0dX20wC0htZ7sYCsvCYmrouYra0zHzaclZhE= github.com/valyala/fasthttp v1.29.0 h1:F5GKpytwFk5OhCuRh6H+d4vZAcEeNAwPTdwQnm6IERY= github.com/valyala/fasthttp v1.29.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus= -github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc= diff --git a/plugins/grpc/codec.go b/plugins/grpc/codec/codec.go index aeb373b9..d34cedf1 100644 --- a/plugins/grpc/codec.go +++ b/plugins/grpc/codec/codec.go @@ -1,21 +1,21 @@ -package grpc +package codec import "google.golang.org/grpc/encoding" -type rawMessage []byte +type RawMessage []byte const cName string = "proto" const rm string = "rawMessage" -func (r rawMessage) Reset() {} -func (rawMessage) ProtoMessage() {} -func (rawMessage) String() string { return rm } +func (r RawMessage) Reset() {} +func (RawMessage) ProtoMessage() {} +func (RawMessage) String() string { return rm } -type codec struct{ base encoding.Codec } +type Codec struct{ base encoding.Codec } // Marshal returns the wire format of v. rawMessages would be returned without encoding. -func (c *codec) Marshal(v interface{}) ([]byte, error) { - if raw, ok := v.(rawMessage); ok { +func (c *Codec) Marshal(v interface{}) ([]byte, error) { + if raw, ok := v.(RawMessage); ok { return raw, nil } @@ -23,8 +23,8 @@ func (c *codec) Marshal(v interface{}) ([]byte, error) { } // Unmarshal parses the wire format into v. rawMessages would not be unmarshalled. -func (c *codec) Unmarshal(data []byte, v interface{}) error { - if raw, ok := v.(*rawMessage); ok { +func (c *Codec) Unmarshal(data []byte, v interface{}) error { + if raw, ok := v.(*RawMessage); ok { *raw = data return nil } @@ -32,11 +32,11 @@ func (c *codec) Unmarshal(data []byte, v interface{}) error { return c.base.Unmarshal(data, v) } -func (c *codec) Name() string { +func (c *Codec) Name() string { return cName } // String return codec name. -func (c *codec) String() string { +func (c *Codec) String() string { return "raw:" + c.base.Name() } diff --git a/plugins/grpc/codec_test.go b/plugins/grpc/codec/codec_test.go index 5f94b745..60efb072 100644 --- a/plugins/grpc/codec_test.go +++ b/plugins/grpc/codec/codec_test.go @@ -1,4 +1,4 @@ -package grpc +package codec import ( "testing" @@ -22,18 +22,18 @@ func (jsonCodec) Name() string { } func TestCodec_String(t *testing.T) { - c := codec{jsonCodec{}} + c := Codec{jsonCodec{}} assert.Equal(t, "raw:json", c.String()) - r := rawMessage{} + r := RawMessage{} r.Reset() r.ProtoMessage() assert.Equal(t, "rawMessage", r.String()) } func TestCodec_Unmarshal_ByPass(t *testing.T) { - c := codec{jsonCodec{}} + c := Codec{jsonCodec{}} s := struct { Name string @@ -44,7 +44,7 @@ func TestCodec_Unmarshal_ByPass(t *testing.T) { } func TestCodec_Marshal_ByPass(t *testing.T) { - c := codec{jsonCodec{}} + c := Codec{jsonCodec{}} s := struct { Name string @@ -59,18 +59,18 @@ func TestCodec_Marshal_ByPass(t *testing.T) { } func TestCodec_Unmarshal_Raw(t *testing.T) { - c := codec{jsonCodec{}} + c := Codec{jsonCodec{}} - s := rawMessage{} + s := RawMessage{} assert.NoError(t, c.Unmarshal([]byte(`{"name":"name"}`), &s)) assert.Equal(t, `{"name":"name"}`, string(s)) } func TestCodec_Marshal_Raw(t *testing.T) { - c := codec{jsonCodec{}} + c := Codec{jsonCodec{}} - s := rawMessage(`{"Name":"name"}`) + s := RawMessage(`{"Name":"name"}`) d, err := c.Marshal(s) assert.NoError(t, err) diff --git a/plugins/grpc/config.go b/plugins/grpc/config.go index 53c50e22..8a3af6a2 100644 --- a/plugins/grpc/config.go +++ b/plugins/grpc/config.go @@ -12,7 +12,7 @@ type Config struct { TLS *TLS - grpcPool pool.Pool + grpcPool pool.Config MaxSendMsgSize int64 `mapstructure:"max_send_msg_size"` MaxRecvMsgSize int64 `mapstructure:"max_recv_msg_size"` MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle"` diff --git a/plugins/grpc/parser/message.proto b/plugins/grpc/parser/message.proto new file mode 100644 index 00000000..a4012010 --- /dev/null +++ b/plugins/grpc/parser/message.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package app.namespace; + +message Message { + string msg = 1; + int64 value = 2; +}
\ No newline at end of file diff --git a/plugins/grpc/parser/parse.go b/plugins/grpc/parser/parse.go new file mode 100644 index 00000000..d59b0927 --- /dev/null +++ b/plugins/grpc/parser/parse.go @@ -0,0 +1,114 @@ +package parser + +import ( + "bytes" + "io" + "os" + + pp "github.com/emicklei/proto" +) + +// Service contains information about singular GRPC service. +type Service struct { + // Package defines service namespace. + Package string + + // Name defines service name. + Name string + + // Methods list. + Methods []Method +} + +// Method describes singular RPC method. +type Method struct { + // Name is method name. + Name string + + // StreamsRequest defines if method accept stream input. + StreamsRequest bool + + // RequestType defines message name (from the same package) of method input. + RequestType string + + // StreamsReturns defines if method streams result. + StreamsReturns bool + + // ReturnsType defines message name (from the same package) of method return value. + ReturnsType string +} + +// File parses given proto file or returns error. +func File(file string, importPath string) ([]Service, error) { + reader, _ := os.Open(file) + defer reader.Close() + + return parse(reader, importPath) +} + +// Bytes parses string into proto definition. +func Bytes(data []byte) ([]Service, error) { + return parse(bytes.NewBuffer(data), "") +} + +func parse(reader io.Reader, importPath string) ([]Service, error) { + proto, err := pp.NewParser(reader).Parse() + if err != nil { + return nil, err + } + + return parseServices( + proto, + parsePackage(proto), + importPath, + ) +} + +func parsePackage(proto *pp.Proto) string { + for _, e := range proto.Elements { + if p, ok := e.(*pp.Package); ok { + return p.Name + } + } + + return "" +} + +func parseServices(proto *pp.Proto, pkg string, importPath string) ([]Service, error) { + services := make([]Service, 0) + + pp.Walk(proto, pp.WithService(func(service *pp.Service) { + services = append(services, Service{ + Package: pkg, + Name: service.Name, + Methods: parseMethods(service), + }) + })) + + pp.Walk(proto, func(v pp.Visitee) { + if i, ok := v.(*pp.Import); ok { + if im, err := File(importPath+"/"+i.Filename, importPath); err == nil { + services = append(services, im...) + } + } + }) + + return services, nil +} + +func parseMethods(s *pp.Service) []Method { + methods := make([]Method, 0) + for _, e := range s.Elements { + if m, ok := e.(*pp.RPC); ok { + methods = append(methods, Method{ + Name: m.Name, + StreamsRequest: m.StreamsRequest, + RequestType: m.RequestType, + StreamsReturns: m.StreamsReturns, + ReturnsType: m.ReturnsType, + }) + } + } + + return methods +} diff --git a/plugins/grpc/parser/parse_test.go b/plugins/grpc/parser/parse_test.go new file mode 100644 index 00000000..b71c133d --- /dev/null +++ b/plugins/grpc/parser/parse_test.go @@ -0,0 +1,71 @@ +package parser + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseFile(t *testing.T) { + services, err := File("test.proto", "") + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} + +func TestParseFileWithImportsNestedFolder(t *testing.T) { + services, err := File("./test_nested/test_import.proto", "./test_nested") + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} + +func TestParseFileWithImports(t *testing.T) { + services, err := File("test_import.proto", ".") + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} + +func TestParseNotFound(t *testing.T) { + _, err := File("test2.proto", "") + assert.Error(t, err) +} + +func TestParseBytes(t *testing.T) { + services, err := Bytes([]byte{}) + assert.NoError(t, err) + assert.Len(t, services, 0) +} + +func TestParseString(t *testing.T) { + services, err := Bytes([]byte(` +syntax = "proto3"; +package app.namespace; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +} + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +} + +message Message { + string msg = 1; + int64 value = 2; +} +`)) + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} diff --git a/plugins/grpc/parser/pong.proto b/plugins/grpc/parser/pong.proto new file mode 100644 index 00000000..9756fabe --- /dev/null +++ b/plugins/grpc/parser/pong.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test.proto b/plugins/grpc/parser/test.proto new file mode 100644 index 00000000..e2230954 --- /dev/null +++ b/plugins/grpc/parser/test.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; +package app.namespace; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +} + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +} + +message Message { + string msg = 1; + int64 value = 2; +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_import.proto b/plugins/grpc/parser/test_import.proto new file mode 100644 index 00000000..1b954fc1 --- /dev/null +++ b/plugins/grpc/parser/test_import.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; +import "pong.proto"; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/message.proto b/plugins/grpc/parser/test_nested/message.proto new file mode 100644 index 00000000..a4012010 --- /dev/null +++ b/plugins/grpc/parser/test_nested/message.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package app.namespace; + +message Message { + string msg = 1; + int64 value = 2; +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/pong.proto b/plugins/grpc/parser/test_nested/pong.proto new file mode 100644 index 00000000..9756fabe --- /dev/null +++ b/plugins/grpc/parser/test_nested/pong.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/test_import.proto b/plugins/grpc/parser/test_nested/test_import.proto new file mode 100644 index 00000000..a3a476ba --- /dev/null +++ b/plugins/grpc/parser/test_nested/test_import.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; +import "pong.proto"; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +} diff --git a/plugins/grpc/plugin.go b/plugins/grpc/plugin.go index b424c5d0..9285afe5 100644 --- a/plugins/grpc/plugin.go +++ b/plugins/grpc/plugin.go @@ -2,7 +2,9 @@ package grpc import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/grpc/codec" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/grpc" "google.golang.org/grpc/encoding" @@ -13,8 +15,10 @@ const ( ) type Plugin struct { - config *Config - opts []grpc.ServerOption + config *Config + gPool pool.Pool + opts []grpc.ServerOption + services []func(server *grpc.Server) cfg config.Configurer log logger.Logger @@ -24,7 +28,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("grpc_plugin_init") // register the codec - encoding.RegisterCodec(&codec{}) + encoding.RegisterCodec(&codec.Codec{}) return nil } diff --git a/plugins/grpc/proxy/proxy.go b/plugins/grpc/proxy/proxy.go new file mode 100644 index 00000000..7a8eaa1f --- /dev/null +++ b/plugins/grpc/proxy/proxy.go @@ -0,0 +1,203 @@ +package proxy + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/grpc/codec" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" +) + +// base interface for Proxy class +type proxyService interface { + // RegisterMethod registers new RPC method. + RegisterMethod(method string) + + // ServiceDesc returns service description for the proxy. + ServiceDesc() *grpc.ServiceDesc +} + +// carry details about service, method and RPC context to PHP process +type rpcContext struct { + Service string `json:"service"` + Method string `json:"method"` + Context map[string][]string `json:"context"` +} + +// Proxy manages GRPC/RoadRunner bridge. +type Proxy struct { + grpcPool pool.Pool + name string + metadata string + methods []string +} + +// NewProxy creates new service proxy object. +func NewProxy(name string, metadata string, grpcPool pool.Pool) *Proxy { + return &Proxy{ + grpcPool: grpcPool, + name: name, + metadata: metadata, + methods: make([]string, 0), + } +} + +// RegisterMethod registers new RPC method. +func (p *Proxy) RegisterMethod(method string) { + p.methods = append(p.methods, method) +} + +// ServiceDesc returns service description for the proxy. +func (p *Proxy) ServiceDesc() *grpc.ServiceDesc { + desc := &grpc.ServiceDesc{ + ServiceName: p.name, + Metadata: p.metadata, + HandlerType: (*proxyService)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{}, + } + + // Registering methods + for _, m := range p.methods { + desc.Methods = append(desc.Methods, grpc.MethodDesc{ + MethodName: m, + Handler: p.methodHandler(m), + }) + } + + return desc +} + +// Generate method handler proxy. +func (p *Proxy) methodHandler(method string) func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + return func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := codec.RawMessage{} + if err := dec(&in); err != nil { + return nil, wrapError(err) + } + + if interceptor == nil { + return p.invoke(ctx, method, in) + } + + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: fmt.Sprintf("/%s/%s", p.name, method), + } + + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return p.invoke(ctx, method, req.(codec.RawMessage)) + } + + return interceptor(ctx, in, info, handler) + } +} + +func (p *Proxy) invoke(ctx context.Context, method string, in codec.RawMessage) (interface{}, error) { + payload, err := p.makePayload(ctx, method, in) + if err != nil { + return nil, err + } + + resp, err := p.grpcPool.Exec(payload) + + if err != nil { + return nil, wrapError(err) + } + + md, err := p.responseMetadata(resp) + if err != nil { + return nil, err + } + ctx = metadata.NewIncomingContext(ctx, md) + err = grpc.SetHeader(ctx, md) + if err != nil { + return nil, err + } + + return codec.RawMessage(resp.Body), nil +} + +// responseMetadata extracts metadata from roadrunner response Payload.Context and converts it to metadata.MD +func (p *Proxy) responseMetadata(resp *payload.Payload) (metadata.MD, error) { + var md metadata.MD + if resp == nil || len(resp.Context) == 0 { + return md, nil + } + + var rpcMetadata map[string]string + err := json.Unmarshal(resp.Context, &rpcMetadata) + if err != nil { + return md, err + } + + if len(rpcMetadata) > 0 { + md = metadata.New(rpcMetadata) + } + + return md, nil +} + +// makePayload generates RoadRunner compatible payload based on GRPC message. todo: return error +func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMessage) (*payload.Payload, error) { + ctxMD := make(map[string][]string) + + if md, ok := metadata.FromIncomingContext(ctx); ok { + for k, v := range md { + ctxMD[k] = v + } + } + + if pr, ok := peer.FromContext(ctx); ok { + ctxMD[":peer.address"] = []string{pr.Addr.String()} + if pr.AuthInfo != nil { + ctxMD[":peer.auth-type"] = []string{pr.AuthInfo.AuthType()} + } + } + + ctxData, err := json.Marshal(rpcContext{Service: p.name, Method: method, Context: ctxMD}) + + if err != nil { + return nil, err + } + + return &payload.Payload{Context: ctxData, Body: body}, nil +} + +// mounts proper error code for the error +func wrapError(err error) error { + // internal agreement + if strings.Contains(err.Error(), "|:|") { + chunks := strings.Split(err.Error(), "|:|") + code := codes.Internal + + if phpCode, errConv := strconv.Atoi(chunks[0]); errConv == nil { + code = codes.Code(phpCode) + } + + st := status.New(code, chunks[1]).Proto() + + for _, detailsMessage := range chunks[2:] { + anyDetailsMessage := anypb.Any{} + errP := proto.Unmarshal([]byte(detailsMessage), &anyDetailsMessage) + if errP == nil { + st.Details = append(st.Details, &anyDetailsMessage) + } + } + + return status.ErrorProto(st) + } + + return status.Error(codes.Internal, err.Error()) +} diff --git a/plugins/grpc/proxy/proxy_test.go b/plugins/grpc/proxy/proxy_test.go new file mode 100644 index 00000000..2c024ee3 --- /dev/null +++ b/plugins/grpc/proxy/proxy_test.go @@ -0,0 +1,134 @@ +package proxy + +// import ( +// "testing" +// "time" + +// "github.com/sirupsen/logrus" +// "github.com/sirupsen/logrus/hooks/test" +// "github.com/stretchr/testify/assert" +// "golang.org/x/net/context" +// "google.golang.org/grpc" +// "google.golang.org/grpc/codes" +// "google.golang.org/grpc/metadata" +// "google.golang.org/grpc/status" +// ) + +// const addr = "localhost:9080" + +// func Test_Proxy_Error(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) + +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) + +// assert.NoError(t, c.Init(&testCfg{ +// grpcCfg: `{ +// "listen": "tcp://:9080", +// "tls": { +// "key": "tests/server.key", +// "cert": "tests/server.crt" +// }, +// "proto": "tests/test.proto", +// "workers":{ +// "command": "php tests/worker.php", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10, +// "destroyTimeout": 10 +// } +// } +// }`, +// })) + +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) + +// // should do nothing +// s.(*Service).Stop() + +// go func() { assert.NoError(t, c.Serve()) }() +// time.Sleep(time.Millisecond * 100) +// defer c.Stop() + +// cl, cn := getClient(addr) +// defer cn.Close() + +// _, err := cl.Throw(context.Background(), &tests.Message{Msg: "notFound"}) + +// assert.Error(t, err) +// se, _ := status.FromError(err) +// assert.Equal(t, "nothing here", se.Message()) +// assert.Equal(t, codes.NotFound, se.Code()) + +// _, errWithDetails := cl.Throw(context.Background(), &tests.Message{Msg: "withDetails"}) + +// assert.Error(t, errWithDetails) +// statusWithDetails, _ := status.FromError(errWithDetails) +// assert.Equal(t, "main exception message", statusWithDetails.Message()) +// assert.Equal(t, codes.InvalidArgument, statusWithDetails.Code()) + +// details := statusWithDetails.Details() + +// detailsMessageForException := details[0].(*tests.DetailsMessageForException) + +// assert.Equal(t, detailsMessageForException.Code, uint64(1)) +// assert.Equal(t, detailsMessageForException.Message, "details message") +// } + +// func Test_Proxy_Metadata(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) + +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) + +// assert.NoError(t, c.Init(&testCfg{ +// grpcCfg: `{ +// "listen": "tcp://:9080", +// "tls": { +// "key": "tests/server.key", +// "cert": "tests/server.crt" +// }, +// "proto": "tests/test.proto", +// "workers":{ +// "command": "php tests/worker.php", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10, +// "destroyTimeout": 10 +// } +// } +// }`, +// })) + +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) + +// // should do nothing +// s.(*Service).Stop() + +// go func() { assert.NoError(t, c.Serve()) }() +// time.Sleep(time.Millisecond * 100) +// defer c.Stop() + +// cl, cn := getClient(addr) +// defer cn.Close() + +// ctx := metadata.AppendToOutgoingContext(context.Background(), "key", "proxy-value") +// var header metadata.MD +// out, err := cl.Info( +// ctx, +// &tests.Message{Msg: "MD"}, +// grpc.Header(&header), +// grpc.WaitForReady(true), +// ) +// assert.Equal(t, []string{"bar"}, header.Get("foo")) +// assert.NoError(t, err) +// assert.Equal(t, `["proxy-value"]`, out.Msg) +// } diff --git a/plugins/grpc/server.go b/plugins/grpc/server.go index 735d53e9..24759fba 100644 --- a/plugins/grpc/server.go +++ b/plugins/grpc/server.go @@ -4,14 +4,16 @@ import ( "context" "crypto/tls" "crypto/x509" + "fmt" "os" + "path" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/grpc/parser" + "github.com/spiral/roadrunner/v2/plugins/grpc/proxy" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/reflection" - "google.golang.org/protobuf/reflect/protoregistry" ) func (p *Plugin) createGRPCserver() (*grpc.Server, error) { @@ -22,18 +24,28 @@ func (p *Plugin) createGRPCserver() (*grpc.Server, error) { } server := grpc.NewServer(opts...) - reflection.Register(server) - fd, err := protoregistry.GlobalFiles.FindFileByPath("file") - if err != nil { - return nil, errors.E(op, err) - } + if p.config.Proto != "" { + // php proxy services + services, err := parser.File(p.config.Proto, path.Dir(p.config.Proto)) + if err != nil { + return nil, err + } - _ = fd + for _, service := range services { + p := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto, p.gPool) + for _, m := range service.Methods { + p.RegisterMethod(m.Name) + } - /* - proto descriptors parser - */ + server.RegisterService(p.ServiceDesc(), p) + } + } + + // external and native services + for _, r := range p.services { + r(server) + } return server, nil } |