summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.vscode/settings.json2
-rw-r--r--Makefile15
-rw-r--r--go.mod8
-rw-r--r--go.sum14
-rw-r--r--plugins/grpc/codec/codec.go (renamed from plugins/grpc/codec.go)24
-rw-r--r--plugins/grpc/codec/codec_test.go (renamed from plugins/grpc/codec_test.go)18
-rw-r--r--plugins/grpc/config.go2
-rw-r--r--plugins/grpc/parser/message.proto7
-rw-r--r--plugins/grpc/parser/parse.go114
-rw-r--r--plugins/grpc/parser/parse_test.go71
-rw-r--r--plugins/grpc/parser/pong.proto10
-rw-r--r--plugins/grpc/parser/test.proto20
-rw-r--r--plugins/grpc/parser/test_import.proto12
-rw-r--r--plugins/grpc/parser/test_nested/message.proto7
-rw-r--r--plugins/grpc/parser/test_nested/pong.proto10
-rw-r--r--plugins/grpc/parser/test_nested/test_import.proto12
-rw-r--r--plugins/grpc/plugin.go10
-rw-r--r--plugins/grpc/proxy/proxy.go203
-rw-r--r--plugins/grpc/proxy/proxy_test.go134
-rw-r--r--plugins/grpc/server.go34
20 files changed, 676 insertions, 51 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 44b782ad..50bc942d 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -3,8 +3,10 @@
"addrs",
"amqp",
"amqpjobs",
+ "anypb",
"boltdb",
"codecov",
+ "Conv",
"golangci",
"gomemcache",
"goridge",
diff --git a/Makefile b/Makefile
index 40b0a6de..109c963e 100644
--- a/Makefile
+++ b/Makefile
@@ -17,11 +17,15 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.out -covermode=atomic ./plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.out -covermode=atomic ./plugins/http/config
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_codec.out -covermode=atomic ./plugins/grpc/codec
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_parser.out -covermode=atomic ./plugins/grpc/parser
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_proxy.out -covermode=atomic ./plugins/grpc/proxy
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.out -covermode=atomic ./plugins/server
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.out -covermode=atomic ./plugins/jobs/job
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.out -covermode=atomic ./tests/plugins/jobs
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.out -covermode=atomic ./tests/plugins/kv
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_plugin.out -covermode=atomic ./tests/plugins/grpc
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.out -covermode=atomic ./tests/plugins/broadcast
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.out -covermode=atomic ./tests/plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.out -covermode=atomic ./tests/plugins/http
@@ -54,14 +58,17 @@ test: ## Run application tests
go test -v -race -tags=debug ./plugins/http/config
go test -v -race -tags=debug ./plugins/server
go test -v -race -tags=debug ./plugins/jobs/job
- go test -v -race -tags=debug ./tests/plugins/grpc
+ go test -v -race -tags=debug ./plugins/websockets
+ go test -v -race -tags=debug ./plugins/grpc
+ go test -v -race -tags=debug ./plugins/grpc/codec
+ go test -v -race -tags=debug ./plugins/grpc/parser
+ go test -v -race -tags=debug ./plugins/grpc/proxy
+ go test -v -race -tags=debug ./tests/plugins/jobs
go test -v -race -tags=debug ./tests/plugins/kv
go test -v -race -tags=debug ./tests/plugins/broadcast
go test -v -race -tags=debug ./tests/plugins/websockets
- go test -v -race -tags=debug ./tests/plugins/jobs
- go test -v -race -tags=debug ./plugins/websockets
- go test -v -race -tags=debug ./plugins/grpc
go test -v -race -tags=debug ./tests/plugins/http
+ go test -v -race -tags=debug ./tests/plugins/grpc
go test -v -race -tags=debug ./tests/plugins/informer
go test -v -race -tags=debug ./tests/plugins/reload
go test -v -race -tags=debug ./tests/plugins/server
diff --git a/go.mod b/go.mod
index b7141c4a..6c1616f5 100644
--- a/go.mod
+++ b/go.mod
@@ -14,7 +14,7 @@ require (
github.com/beanstalkd/go-beanstalk v0.1.0
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cenkalti/backoff/v4 v4.1.1
- github.com/fasthttp/websocket v1.4.3
+ github.com/fasthttp/websocket v1.4.3-rc.8
github.com/fatih/color v1.12.0
github.com/go-redis/redis/v8 v8.11.3
github.com/gofiber/fiber/v2 v2.18.0
@@ -57,9 +57,9 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
+ github.com/emicklei/proto v1.9.1
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-ole/go-ole v1.2.5 // indirect
- github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
@@ -73,7 +73,7 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.30.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
- github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873 // indirect
+ github.com/savsgio/gotils v0.0.0-20210907153846-c06938798b52 // indirect
github.com/spf13/afero v1.6.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
@@ -93,3 +93,5 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
+
+require github.com/golang/protobuf v1.5.2 // indirect
diff --git a/go.sum b/go.sum
index 7dec39f0..07e30edd 100644
--- a/go.sum
+++ b/go.sum
@@ -50,7 +50,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
-github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.0.2/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0UnM=
github.com/andybalholm/brotli v1.0.3/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
@@ -113,6 +112,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
+github.com/emicklei/proto v1.9.1 h1:MUgjFo5xlMwYv72TnF5xmmdKZ04u+dVbv6wdARv16D8=
+github.com/emicklei/proto v1.9.1/go.mod h1:rn1FgRS/FANiZdD2djyH7TMA9jdRDcYQ9IEN9yvjX0A=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -121,8 +122,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
-github.com/fasthttp/websocket v1.4.3 h1:qjhRJ/rTy4KB8oBxljEC00SDt6HUY9jLRfM601SUdS4=
-github.com/fasthttp/websocket v1.4.3/go.mod h1:5r4oKssgS7W6Zn6mPWap3NWzNPJNzUUh3baWTOhcYQk=
+github.com/fasthttp/websocket v1.4.3-rc.8 h1:6P/+ejKdkLC9UhkY7GlShGWYMDBiWQtIECLBTDZ/2LU=
+github.com/fasthttp/websocket v1.4.3-rc.8/go.mod h1:4m/MeZnTBQR2coy0HDUpyBXDkgtl2SxO+GZng0EKr6k=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
@@ -265,7 +266,6 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
-github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
@@ -356,9 +356,9 @@ github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891/go.mod h1:ogQD
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
-github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVzGOXq8RbEvHnhzA/A2sLZzgn0m6URjnukY8=
-github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873 h1:N3Af8f13ooDKcIhsmFT7Z05CStZWu4C7Md0uDEy4q6o=
github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873/go.mod h1:dmPawKuiAeG/aFYVs2i+Dyosoo7FNcm+Pi8iK6ZUrX8=
+github.com/savsgio/gotils v0.0.0-20210907153846-c06938798b52 h1:FODZE/jDkENIpW3JiMA9sXBQfNklTfClUNhR9k37dPY=
+github.com/savsgio/gotils v0.0.0-20210907153846-c06938798b52/go.mod h1:oejLrk1Y/5zOF+c/aHtXqn3TFlzzbAgPWg8zBiAHDas=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v3.21.8+incompatible h1:sh0foI8tMRlCidUJR+KzqWYWxrkuuPIGiO6Vp+KXdCU=
github.com/shirou/gopsutil v3.21.8+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
@@ -405,10 +405,8 @@ github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2bi
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
-github.com/valyala/fasthttp v1.14.0/go.mod h1:ol1PCaL0dX20wC0htZ7sYCsvCYmrouYra0zHzaclZhE=
github.com/valyala/fasthttp v1.29.0 h1:F5GKpytwFk5OhCuRh6H+d4vZAcEeNAwPTdwQnm6IERY=
github.com/valyala/fasthttp v1.29.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
-github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc=
diff --git a/plugins/grpc/codec.go b/plugins/grpc/codec/codec.go
index aeb373b9..d34cedf1 100644
--- a/plugins/grpc/codec.go
+++ b/plugins/grpc/codec/codec.go
@@ -1,21 +1,21 @@
-package grpc
+package codec
import "google.golang.org/grpc/encoding"
-type rawMessage []byte
+type RawMessage []byte
const cName string = "proto"
const rm string = "rawMessage"
-func (r rawMessage) Reset() {}
-func (rawMessage) ProtoMessage() {}
-func (rawMessage) String() string { return rm }
+func (r RawMessage) Reset() {}
+func (RawMessage) ProtoMessage() {}
+func (RawMessage) String() string { return rm }
-type codec struct{ base encoding.Codec }
+type Codec struct{ base encoding.Codec }
// Marshal returns the wire format of v. rawMessages would be returned without encoding.
-func (c *codec) Marshal(v interface{}) ([]byte, error) {
- if raw, ok := v.(rawMessage); ok {
+func (c *Codec) Marshal(v interface{}) ([]byte, error) {
+ if raw, ok := v.(RawMessage); ok {
return raw, nil
}
@@ -23,8 +23,8 @@ func (c *codec) Marshal(v interface{}) ([]byte, error) {
}
// Unmarshal parses the wire format into v. rawMessages would not be unmarshalled.
-func (c *codec) Unmarshal(data []byte, v interface{}) error {
- if raw, ok := v.(*rawMessage); ok {
+func (c *Codec) Unmarshal(data []byte, v interface{}) error {
+ if raw, ok := v.(*RawMessage); ok {
*raw = data
return nil
}
@@ -32,11 +32,11 @@ func (c *codec) Unmarshal(data []byte, v interface{}) error {
return c.base.Unmarshal(data, v)
}
-func (c *codec) Name() string {
+func (c *Codec) Name() string {
return cName
}
// String return codec name.
-func (c *codec) String() string {
+func (c *Codec) String() string {
return "raw:" + c.base.Name()
}
diff --git a/plugins/grpc/codec_test.go b/plugins/grpc/codec/codec_test.go
index 5f94b745..60efb072 100644
--- a/plugins/grpc/codec_test.go
+++ b/plugins/grpc/codec/codec_test.go
@@ -1,4 +1,4 @@
-package grpc
+package codec
import (
"testing"
@@ -22,18 +22,18 @@ func (jsonCodec) Name() string {
}
func TestCodec_String(t *testing.T) {
- c := codec{jsonCodec{}}
+ c := Codec{jsonCodec{}}
assert.Equal(t, "raw:json", c.String())
- r := rawMessage{}
+ r := RawMessage{}
r.Reset()
r.ProtoMessage()
assert.Equal(t, "rawMessage", r.String())
}
func TestCodec_Unmarshal_ByPass(t *testing.T) {
- c := codec{jsonCodec{}}
+ c := Codec{jsonCodec{}}
s := struct {
Name string
@@ -44,7 +44,7 @@ func TestCodec_Unmarshal_ByPass(t *testing.T) {
}
func TestCodec_Marshal_ByPass(t *testing.T) {
- c := codec{jsonCodec{}}
+ c := Codec{jsonCodec{}}
s := struct {
Name string
@@ -59,18 +59,18 @@ func TestCodec_Marshal_ByPass(t *testing.T) {
}
func TestCodec_Unmarshal_Raw(t *testing.T) {
- c := codec{jsonCodec{}}
+ c := Codec{jsonCodec{}}
- s := rawMessage{}
+ s := RawMessage{}
assert.NoError(t, c.Unmarshal([]byte(`{"name":"name"}`), &s))
assert.Equal(t, `{"name":"name"}`, string(s))
}
func TestCodec_Marshal_Raw(t *testing.T) {
- c := codec{jsonCodec{}}
+ c := Codec{jsonCodec{}}
- s := rawMessage(`{"Name":"name"}`)
+ s := RawMessage(`{"Name":"name"}`)
d, err := c.Marshal(s)
assert.NoError(t, err)
diff --git a/plugins/grpc/config.go b/plugins/grpc/config.go
index 53c50e22..8a3af6a2 100644
--- a/plugins/grpc/config.go
+++ b/plugins/grpc/config.go
@@ -12,7 +12,7 @@ type Config struct {
TLS *TLS
- grpcPool pool.Pool
+ grpcPool pool.Config
MaxSendMsgSize int64 `mapstructure:"max_send_msg_size"`
MaxRecvMsgSize int64 `mapstructure:"max_recv_msg_size"`
MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle"`
diff --git a/plugins/grpc/parser/message.proto b/plugins/grpc/parser/message.proto
new file mode 100644
index 00000000..a4012010
--- /dev/null
+++ b/plugins/grpc/parser/message.proto
@@ -0,0 +1,7 @@
+syntax = "proto3";
+package app.namespace;
+
+message Message {
+ string msg = 1;
+ int64 value = 2;
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/parse.go b/plugins/grpc/parser/parse.go
new file mode 100644
index 00000000..d59b0927
--- /dev/null
+++ b/plugins/grpc/parser/parse.go
@@ -0,0 +1,114 @@
+package parser
+
+import (
+ "bytes"
+ "io"
+ "os"
+
+ pp "github.com/emicklei/proto"
+)
+
+// Service contains information about singular GRPC service.
+type Service struct {
+ // Package defines service namespace.
+ Package string
+
+ // Name defines service name.
+ Name string
+
+ // Methods list.
+ Methods []Method
+}
+
+// Method describes singular RPC method.
+type Method struct {
+ // Name is method name.
+ Name string
+
+ // StreamsRequest defines if method accept stream input.
+ StreamsRequest bool
+
+ // RequestType defines message name (from the same package) of method input.
+ RequestType string
+
+ // StreamsReturns defines if method streams result.
+ StreamsReturns bool
+
+ // ReturnsType defines message name (from the same package) of method return value.
+ ReturnsType string
+}
+
+// File parses given proto file or returns error.
+func File(file string, importPath string) ([]Service, error) {
+ reader, _ := os.Open(file)
+ defer reader.Close()
+
+ return parse(reader, importPath)
+}
+
+// Bytes parses string into proto definition.
+func Bytes(data []byte) ([]Service, error) {
+ return parse(bytes.NewBuffer(data), "")
+}
+
+func parse(reader io.Reader, importPath string) ([]Service, error) {
+ proto, err := pp.NewParser(reader).Parse()
+ if err != nil {
+ return nil, err
+ }
+
+ return parseServices(
+ proto,
+ parsePackage(proto),
+ importPath,
+ )
+}
+
+func parsePackage(proto *pp.Proto) string {
+ for _, e := range proto.Elements {
+ if p, ok := e.(*pp.Package); ok {
+ return p.Name
+ }
+ }
+
+ return ""
+}
+
+func parseServices(proto *pp.Proto, pkg string, importPath string) ([]Service, error) {
+ services := make([]Service, 0)
+
+ pp.Walk(proto, pp.WithService(func(service *pp.Service) {
+ services = append(services, Service{
+ Package: pkg,
+ Name: service.Name,
+ Methods: parseMethods(service),
+ })
+ }))
+
+ pp.Walk(proto, func(v pp.Visitee) {
+ if i, ok := v.(*pp.Import); ok {
+ if im, err := File(importPath+"/"+i.Filename, importPath); err == nil {
+ services = append(services, im...)
+ }
+ }
+ })
+
+ return services, nil
+}
+
+func parseMethods(s *pp.Service) []Method {
+ methods := make([]Method, 0)
+ for _, e := range s.Elements {
+ if m, ok := e.(*pp.RPC); ok {
+ methods = append(methods, Method{
+ Name: m.Name,
+ StreamsRequest: m.StreamsRequest,
+ RequestType: m.RequestType,
+ StreamsReturns: m.StreamsReturns,
+ ReturnsType: m.ReturnsType,
+ })
+ }
+ }
+
+ return methods
+}
diff --git a/plugins/grpc/parser/parse_test.go b/plugins/grpc/parser/parse_test.go
new file mode 100644
index 00000000..b71c133d
--- /dev/null
+++ b/plugins/grpc/parser/parse_test.go
@@ -0,0 +1,71 @@
+package parser
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestParseFile(t *testing.T) {
+ services, err := File("test.proto", "")
+ assert.NoError(t, err)
+ assert.Len(t, services, 2)
+
+ assert.Equal(t, "app.namespace", services[0].Package)
+}
+
+func TestParseFileWithImportsNestedFolder(t *testing.T) {
+ services, err := File("./test_nested/test_import.proto", "./test_nested")
+ assert.NoError(t, err)
+ assert.Len(t, services, 2)
+
+ assert.Equal(t, "app.namespace", services[0].Package)
+}
+
+func TestParseFileWithImports(t *testing.T) {
+ services, err := File("test_import.proto", ".")
+ assert.NoError(t, err)
+ assert.Len(t, services, 2)
+
+ assert.Equal(t, "app.namespace", services[0].Package)
+}
+
+func TestParseNotFound(t *testing.T) {
+ _, err := File("test2.proto", "")
+ assert.Error(t, err)
+}
+
+func TestParseBytes(t *testing.T) {
+ services, err := Bytes([]byte{})
+ assert.NoError(t, err)
+ assert.Len(t, services, 0)
+}
+
+func TestParseString(t *testing.T) {
+ services, err := Bytes([]byte(`
+syntax = "proto3";
+package app.namespace;
+
+// Ping Service.
+service PingService {
+ // Ping Method.
+ rpc Ping (Message) returns (Message) {
+ }
+}
+
+// Pong service.
+service PongService {
+ rpc Pong (stream Message) returns (stream Message) {
+ }
+}
+
+message Message {
+ string msg = 1;
+ int64 value = 2;
+}
+`))
+ assert.NoError(t, err)
+ assert.Len(t, services, 2)
+
+ assert.Equal(t, "app.namespace", services[0].Package)
+}
diff --git a/plugins/grpc/parser/pong.proto b/plugins/grpc/parser/pong.proto
new file mode 100644
index 00000000..9756fabe
--- /dev/null
+++ b/plugins/grpc/parser/pong.proto
@@ -0,0 +1,10 @@
+syntax = "proto3";
+package app.namespace;
+
+import "message.proto";
+
+// Pong service.
+service PongService {
+ rpc Pong (stream Message) returns (stream Message) {
+ }
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/test.proto b/plugins/grpc/parser/test.proto
new file mode 100644
index 00000000..e2230954
--- /dev/null
+++ b/plugins/grpc/parser/test.proto
@@ -0,0 +1,20 @@
+syntax = "proto3";
+package app.namespace;
+
+// Ping Service.
+service PingService {
+ // Ping Method.
+ rpc Ping (Message) returns (Message) {
+ }
+}
+
+// Pong service.
+service PongService {
+ rpc Pong (stream Message) returns (stream Message) {
+ }
+}
+
+message Message {
+ string msg = 1;
+ int64 value = 2;
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/test_import.proto b/plugins/grpc/parser/test_import.proto
new file mode 100644
index 00000000..1b954fc1
--- /dev/null
+++ b/plugins/grpc/parser/test_import.proto
@@ -0,0 +1,12 @@
+syntax = "proto3";
+package app.namespace;
+
+import "message.proto";
+import "pong.proto";
+
+// Ping Service.
+service PingService {
+ // Ping Method.
+ rpc Ping (Message) returns (Message) {
+ }
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/test_nested/message.proto b/plugins/grpc/parser/test_nested/message.proto
new file mode 100644
index 00000000..a4012010
--- /dev/null
+++ b/plugins/grpc/parser/test_nested/message.proto
@@ -0,0 +1,7 @@
+syntax = "proto3";
+package app.namespace;
+
+message Message {
+ string msg = 1;
+ int64 value = 2;
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/test_nested/pong.proto b/plugins/grpc/parser/test_nested/pong.proto
new file mode 100644
index 00000000..9756fabe
--- /dev/null
+++ b/plugins/grpc/parser/test_nested/pong.proto
@@ -0,0 +1,10 @@
+syntax = "proto3";
+package app.namespace;
+
+import "message.proto";
+
+// Pong service.
+service PongService {
+ rpc Pong (stream Message) returns (stream Message) {
+ }
+} \ No newline at end of file
diff --git a/plugins/grpc/parser/test_nested/test_import.proto b/plugins/grpc/parser/test_nested/test_import.proto
new file mode 100644
index 00000000..a3a476ba
--- /dev/null
+++ b/plugins/grpc/parser/test_nested/test_import.proto
@@ -0,0 +1,12 @@
+syntax = "proto3";
+package app.namespace;
+
+import "message.proto";
+import "pong.proto";
+
+// Ping Service.
+service PingService {
+ // Ping Method.
+ rpc Ping (Message) returns (Message) {
+ }
+}
diff --git a/plugins/grpc/plugin.go b/plugins/grpc/plugin.go
index b424c5d0..9285afe5 100644
--- a/plugins/grpc/plugin.go
+++ b/plugins/grpc/plugin.go
@@ -2,7 +2,9 @@ package grpc
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/grpc/codec"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
@@ -13,8 +15,10 @@ const (
)
type Plugin struct {
- config *Config
- opts []grpc.ServerOption
+ config *Config
+ gPool pool.Pool
+ opts []grpc.ServerOption
+ services []func(server *grpc.Server)
cfg config.Configurer
log logger.Logger
@@ -24,7 +28,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("grpc_plugin_init")
// register the codec
- encoding.RegisterCodec(&codec{})
+ encoding.RegisterCodec(&codec.Codec{})
return nil
}
diff --git a/plugins/grpc/proxy/proxy.go b/plugins/grpc/proxy/proxy.go
new file mode 100644
index 00000000..7a8eaa1f
--- /dev/null
+++ b/plugins/grpc/proxy/proxy.go
@@ -0,0 +1,203 @@
+package proxy
+
+import (
+ "encoding/json"
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/grpc/codec"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/peer"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/anypb"
+)
+
+// base interface for Proxy class
+type proxyService interface {
+ // RegisterMethod registers new RPC method.
+ RegisterMethod(method string)
+
+ // ServiceDesc returns service description for the proxy.
+ ServiceDesc() *grpc.ServiceDesc
+}
+
+// carry details about service, method and RPC context to PHP process
+type rpcContext struct {
+ Service string `json:"service"`
+ Method string `json:"method"`
+ Context map[string][]string `json:"context"`
+}
+
+// Proxy manages GRPC/RoadRunner bridge.
+type Proxy struct {
+ grpcPool pool.Pool
+ name string
+ metadata string
+ methods []string
+}
+
+// NewProxy creates new service proxy object.
+func NewProxy(name string, metadata string, grpcPool pool.Pool) *Proxy {
+ return &Proxy{
+ grpcPool: grpcPool,
+ name: name,
+ metadata: metadata,
+ methods: make([]string, 0),
+ }
+}
+
+// RegisterMethod registers new RPC method.
+func (p *Proxy) RegisterMethod(method string) {
+ p.methods = append(p.methods, method)
+}
+
+// ServiceDesc returns service description for the proxy.
+func (p *Proxy) ServiceDesc() *grpc.ServiceDesc {
+ desc := &grpc.ServiceDesc{
+ ServiceName: p.name,
+ Metadata: p.metadata,
+ HandlerType: (*proxyService)(nil),
+ Methods: []grpc.MethodDesc{},
+ Streams: []grpc.StreamDesc{},
+ }
+
+ // Registering methods
+ for _, m := range p.methods {
+ desc.Methods = append(desc.Methods, grpc.MethodDesc{
+ MethodName: m,
+ Handler: p.methodHandler(m),
+ })
+ }
+
+ return desc
+}
+
+// Generate method handler proxy.
+func (p *Proxy) methodHandler(method string) func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ return func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := codec.RawMessage{}
+ if err := dec(&in); err != nil {
+ return nil, wrapError(err)
+ }
+
+ if interceptor == nil {
+ return p.invoke(ctx, method, in)
+ }
+
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: fmt.Sprintf("/%s/%s", p.name, method),
+ }
+
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return p.invoke(ctx, method, req.(codec.RawMessage))
+ }
+
+ return interceptor(ctx, in, info, handler)
+ }
+}
+
+func (p *Proxy) invoke(ctx context.Context, method string, in codec.RawMessage) (interface{}, error) {
+ payload, err := p.makePayload(ctx, method, in)
+ if err != nil {
+ return nil, err
+ }
+
+ resp, err := p.grpcPool.Exec(payload)
+
+ if err != nil {
+ return nil, wrapError(err)
+ }
+
+ md, err := p.responseMetadata(resp)
+ if err != nil {
+ return nil, err
+ }
+ ctx = metadata.NewIncomingContext(ctx, md)
+ err = grpc.SetHeader(ctx, md)
+ if err != nil {
+ return nil, err
+ }
+
+ return codec.RawMessage(resp.Body), nil
+}
+
+// responseMetadata extracts metadata from roadrunner response Payload.Context and converts it to metadata.MD
+func (p *Proxy) responseMetadata(resp *payload.Payload) (metadata.MD, error) {
+ var md metadata.MD
+ if resp == nil || len(resp.Context) == 0 {
+ return md, nil
+ }
+
+ var rpcMetadata map[string]string
+ err := json.Unmarshal(resp.Context, &rpcMetadata)
+ if err != nil {
+ return md, err
+ }
+
+ if len(rpcMetadata) > 0 {
+ md = metadata.New(rpcMetadata)
+ }
+
+ return md, nil
+}
+
+// makePayload generates RoadRunner compatible payload based on GRPC message. todo: return error
+func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMessage) (*payload.Payload, error) {
+ ctxMD := make(map[string][]string)
+
+ if md, ok := metadata.FromIncomingContext(ctx); ok {
+ for k, v := range md {
+ ctxMD[k] = v
+ }
+ }
+
+ if pr, ok := peer.FromContext(ctx); ok {
+ ctxMD[":peer.address"] = []string{pr.Addr.String()}
+ if pr.AuthInfo != nil {
+ ctxMD[":peer.auth-type"] = []string{pr.AuthInfo.AuthType()}
+ }
+ }
+
+ ctxData, err := json.Marshal(rpcContext{Service: p.name, Method: method, Context: ctxMD})
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &payload.Payload{Context: ctxData, Body: body}, nil
+}
+
+// mounts proper error code for the error
+func wrapError(err error) error {
+ // internal agreement
+ if strings.Contains(err.Error(), "|:|") {
+ chunks := strings.Split(err.Error(), "|:|")
+ code := codes.Internal
+
+ if phpCode, errConv := strconv.Atoi(chunks[0]); errConv == nil {
+ code = codes.Code(phpCode)
+ }
+
+ st := status.New(code, chunks[1]).Proto()
+
+ for _, detailsMessage := range chunks[2:] {
+ anyDetailsMessage := anypb.Any{}
+ errP := proto.Unmarshal([]byte(detailsMessage), &anyDetailsMessage)
+ if errP == nil {
+ st.Details = append(st.Details, &anyDetailsMessage)
+ }
+ }
+
+ return status.ErrorProto(st)
+ }
+
+ return status.Error(codes.Internal, err.Error())
+}
diff --git a/plugins/grpc/proxy/proxy_test.go b/plugins/grpc/proxy/proxy_test.go
new file mode 100644
index 00000000..2c024ee3
--- /dev/null
+++ b/plugins/grpc/proxy/proxy_test.go
@@ -0,0 +1,134 @@
+package proxy
+
+// import (
+// "testing"
+// "time"
+
+// "github.com/sirupsen/logrus"
+// "github.com/sirupsen/logrus/hooks/test"
+// "github.com/stretchr/testify/assert"
+// "golang.org/x/net/context"
+// "google.golang.org/grpc"
+// "google.golang.org/grpc/codes"
+// "google.golang.org/grpc/metadata"
+// "google.golang.org/grpc/status"
+// )
+
+// const addr = "localhost:9080"
+
+// func Test_Proxy_Error(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+
+// assert.NoError(t, c.Init(&testCfg{
+// grpcCfg: `{
+// "listen": "tcp://:9080",
+// "tls": {
+// "key": "tests/server.key",
+// "cert": "tests/server.crt"
+// },
+// "proto": "tests/test.proto",
+// "workers":{
+// "command": "php tests/worker.php",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10,
+// "destroyTimeout": 10
+// }
+// }
+// }`,
+// }))
+
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+
+// // should do nothing
+// s.(*Service).Stop()
+
+// go func() { assert.NoError(t, c.Serve()) }()
+// time.Sleep(time.Millisecond * 100)
+// defer c.Stop()
+
+// cl, cn := getClient(addr)
+// defer cn.Close()
+
+// _, err := cl.Throw(context.Background(), &tests.Message{Msg: "notFound"})
+
+// assert.Error(t, err)
+// se, _ := status.FromError(err)
+// assert.Equal(t, "nothing here", se.Message())
+// assert.Equal(t, codes.NotFound, se.Code())
+
+// _, errWithDetails := cl.Throw(context.Background(), &tests.Message{Msg: "withDetails"})
+
+// assert.Error(t, errWithDetails)
+// statusWithDetails, _ := status.FromError(errWithDetails)
+// assert.Equal(t, "main exception message", statusWithDetails.Message())
+// assert.Equal(t, codes.InvalidArgument, statusWithDetails.Code())
+
+// details := statusWithDetails.Details()
+
+// detailsMessageForException := details[0].(*tests.DetailsMessageForException)
+
+// assert.Equal(t, detailsMessageForException.Code, uint64(1))
+// assert.Equal(t, detailsMessageForException.Message, "details message")
+// }
+
+// func Test_Proxy_Metadata(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+
+// assert.NoError(t, c.Init(&testCfg{
+// grpcCfg: `{
+// "listen": "tcp://:9080",
+// "tls": {
+// "key": "tests/server.key",
+// "cert": "tests/server.crt"
+// },
+// "proto": "tests/test.proto",
+// "workers":{
+// "command": "php tests/worker.php",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10,
+// "destroyTimeout": 10
+// }
+// }
+// }`,
+// }))
+
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+
+// // should do nothing
+// s.(*Service).Stop()
+
+// go func() { assert.NoError(t, c.Serve()) }()
+// time.Sleep(time.Millisecond * 100)
+// defer c.Stop()
+
+// cl, cn := getClient(addr)
+// defer cn.Close()
+
+// ctx := metadata.AppendToOutgoingContext(context.Background(), "key", "proxy-value")
+// var header metadata.MD
+// out, err := cl.Info(
+// ctx,
+// &tests.Message{Msg: "MD"},
+// grpc.Header(&header),
+// grpc.WaitForReady(true),
+// )
+// assert.Equal(t, []string{"bar"}, header.Get("foo"))
+// assert.NoError(t, err)
+// assert.Equal(t, `["proxy-value"]`, out.Msg)
+// }
diff --git a/plugins/grpc/server.go b/plugins/grpc/server.go
index 735d53e9..24759fba 100644
--- a/plugins/grpc/server.go
+++ b/plugins/grpc/server.go
@@ -4,14 +4,16 @@ import (
"context"
"crypto/tls"
"crypto/x509"
+ "fmt"
"os"
+ "path"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/grpc/parser"
+ "github.com/spiral/roadrunner/v2/plugins/grpc/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/reflection"
- "google.golang.org/protobuf/reflect/protoregistry"
)
func (p *Plugin) createGRPCserver() (*grpc.Server, error) {
@@ -22,18 +24,28 @@ func (p *Plugin) createGRPCserver() (*grpc.Server, error) {
}
server := grpc.NewServer(opts...)
- reflection.Register(server)
- fd, err := protoregistry.GlobalFiles.FindFileByPath("file")
- if err != nil {
- return nil, errors.E(op, err)
- }
+ if p.config.Proto != "" {
+ // php proxy services
+ services, err := parser.File(p.config.Proto, path.Dir(p.config.Proto))
+ if err != nil {
+ return nil, err
+ }
- _ = fd
+ for _, service := range services {
+ p := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto, p.gPool)
+ for _, m := range service.Methods {
+ p.RegisterMethod(m.Name)
+ }
- /*
- proto descriptors parser
- */
+ server.RegisterService(p.ServiceDesc(), p)
+ }
+ }
+
+ // external and native services
+ for _, r := range p.services {
+ r(server)
+ }
return server, nil
}