diff options
67 files changed, 2996 insertions, 94 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 0224de69..b20ec4dd 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -64,6 +64,11 @@ jobs: - name: Install Go dependencies run: go mod download + - name: Install protoc + uses: arduino/setup-protoc@v1 + with: + version: '3.17.3' + - name: Run golang tests with coverage run: make test_coverage diff --git a/.golangci.yml b/.golangci.yml index f6ead63e..f623ed70 100755 --- a/.golangci.yml +++ b/.golangci.yml @@ -91,3 +91,4 @@ issues: - noctx - gosimple - revive + - gochecknoinits diff --git a/.vscode/settings.json b/.vscode/settings.json index 48749050..e7762292 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,27 +3,36 @@ "addrs", "amqp", "amqpjobs", + "anypb", "boltdb", "codecov", + "Conv", "golangci", "gomemcache", "goridge", "hget", "hset", "INMEMORY", + "Itestdata", "memcachedkv", "memorykv", "mexpire", "mget", "prefetch", "proto", + "protobuf", + "protoc", "SETEX", "shivammathur", "srem", "stretchr", + "tmpdir", "unsub", "Upgrader", "websockets", "websocketsv" - ] + ], + "files.associations": { + "Dockerfile-*.yaml": "dockerfile" + } } @@ -17,11 +17,15 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.out -covermode=atomic ./plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.out -covermode=atomic ./plugins/http/config + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_codec.out -covermode=atomic ./plugins/grpc/codec + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_parser.out -covermode=atomic ./plugins/grpc/parser + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_proxy.out -covermode=atomic ./plugins/grpc/proxy go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.out -covermode=atomic ./plugins/server go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.out -covermode=atomic ./plugins/jobs/job go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.out -covermode=atomic ./tests/plugins/jobs go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.out -covermode=atomic ./tests/plugins/kv + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/grpc_plugin.out -covermode=atomic ./tests/plugins/grpc go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.out -covermode=atomic ./tests/plugins/broadcast go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.out -covermode=atomic ./tests/plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.out -covermode=atomic ./tests/plugins/http @@ -54,12 +58,17 @@ test: ## Run application tests go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./plugins/server go test -v -race -tags=debug ./plugins/jobs/job + go test -v -race -tags=debug ./plugins/websockets + go test -v -race -tags=debug ./plugins/grpc + go test -v -race -tags=debug ./plugins/grpc/codec + go test -v -race -tags=debug ./plugins/grpc/parser + go test -v -race -tags=debug ./plugins/grpc/proxy go test -v -race -tags=debug ./tests/plugins/jobs go test -v -race -tags=debug ./tests/plugins/kv go test -v -race -tags=debug ./tests/plugins/broadcast go test -v -race -tags=debug ./tests/plugins/websockets - go test -v -race -tags=debug ./plugins/websockets go test -v -race -tags=debug ./tests/plugins/http + go test -v -race -tags=debug ./tests/plugins/grpc go test -v -race -tags=debug ./tests/plugins/informer go test -v -race -tags=debug ./tests/plugins/reload go test -v -race -tags=debug ./tests/plugins/server @@ -73,3 +82,8 @@ test: ## Run application tests go test -v -race -tags=debug ./tests/plugins/resetter go test -v -race -tags=debug ./tests/plugins/rpc docker-compose -f tests/env/docker-compose.yaml down + +generate-proto: + protoc --proto_path=./proto/jobs/v1beta --go_out=./proto/jobs/v1beta jobs.proto + protoc --proto_path=./proto/kv/v1beta --go_out=./proto/kv/v1beta kv.proto + protoc --proto_path=./proto/websockets/v1beta --go_out=./proto/websockets/v1beta websockets.proto
\ No newline at end of file @@ -6,15 +6,15 @@ require ( github.com/Shopify/toxiproxy v2.1.4+incompatible // ========= AWS SDK v2 github.com/aws/aws-sdk-go-v2 v1.9.0 - github.com/aws/aws-sdk-go-v2/config v1.7.0 + github.com/aws/aws-sdk-go-v2/config v1.8.0 github.com/aws/aws-sdk-go-v2/credentials v1.4.0 - github.com/aws/aws-sdk-go-v2/service/sqs v1.8.0 + github.com/aws/aws-sdk-go-v2/service/sqs v1.9.0 github.com/aws/smithy-go v1.8.0 // ===================== github.com/beanstalkd/go-beanstalk v0.1.0 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cenkalti/backoff/v4 v4.1.1 - github.com/fasthttp/websocket v1.4.3 + github.com/fasthttp/websocket v1.4.3-rc.8 github.com/fatih/color v1.12.0 github.com/go-redis/redis/v8 v8.11.3 github.com/gofiber/fiber/v2 v2.18.0 @@ -37,9 +37,10 @@ require ( go.etcd.io/bbolt v1.3.6 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.19.0 - golang.org/x/net v0.0.0-20210825183410-e898025ed96a + golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e + golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34 + google.golang.org/grpc v1.40.0 google.golang.org/protobuf v1.27.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) @@ -56,23 +57,23 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/emicklei/proto v1.9.1 github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-ole/go-ole v1.2.5 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mattn/go-colorable v0.1.8 // indirect - github.com/mattn/go-isatty v0.0.13 // indirect + github.com/mattn/go-isatty v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect - github.com/pelletier/go-toml v1.9.3 // indirect + github.com/pelletier/go-toml v1.9.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.30.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect - github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873 // indirect + github.com/savsgio/gotils v0.0.0-20210907153846-c06938798b52 // indirect github.com/spf13/afero v1.6.0 // indirect github.com/spf13/cast v1.4.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect @@ -87,7 +88,10 @@ require ( go.uber.org/atomic v1.9.0 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.5 // indirect - gopkg.in/ini.v1 v1.62.0 // indirect + google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect + gopkg.in/ini.v1 v1.63.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) + +require github.com/golang/protobuf v1.5.2 // indirect @@ -40,6 +40,7 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= @@ -49,7 +50,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.2/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0UnM= github.com/andybalholm/brotli v1.0.3/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= @@ -59,8 +59,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aws/aws-sdk-go-v2 v1.9.0 h1:+S+dSqQCN3MSU5vJRu1HqHrq00cJn6heIMU7X9hcsoo= github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= -github.com/aws/aws-sdk-go-v2/config v1.7.0 h1:J2cZ7qe+3IpqBEXnHUrFrOjoB9BlsXg7j53vxcl5IVg= -github.com/aws/aws-sdk-go-v2/config v1.7.0/go.mod h1:w9+nMZ7soXCe5nT46Ri354SNhXDQ6v+V5wqDjnZE+GY= +github.com/aws/aws-sdk-go-v2/config v1.8.0 h1:O8EMFBOl6tue5gdJJV6U3Ikyl3lqgx6WrulCYrcy2SQ= +github.com/aws/aws-sdk-go-v2/config v1.8.0/go.mod h1:w9+nMZ7soXCe5nT46Ri354SNhXDQ6v+V5wqDjnZE+GY= github.com/aws/aws-sdk-go-v2/credentials v1.4.0 h1:kmvesfjY861FzlCU9mvAfe01D9aeXcG2ZuC+k9F2YLM= github.com/aws/aws-sdk-go-v2/credentials v1.4.0/go.mod h1:dgGR+Qq7Wjcd4AOAW5Rf5Tnv3+x7ed6kETXyS9WCuAY= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0 h1:OxTAgH8Y4BXHD6PGCJ8DHx2kaZPCQfSTqmDsdRZFezE= @@ -69,8 +69,8 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2 h1:d95cddM3yTm4qffj3P6EnP+TzX1S github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2/go.mod h1:BQV0agm+JEhqR+2RT5e1XTFIDcAAV0eW6z2trp+iduw= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.0 h1:VNJ5NLBteVXEwE2F1zEXVmyIH58mZ6kIQGJoC7C+vkg= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.0/go.mod h1:R1KK+vY8AfalhG1AOu5e35pOD2SdoPKQCFLTvnxiohk= -github.com/aws/aws-sdk-go-v2/service/sqs v1.8.0 h1:BI05Jbkaqp5IDxiobr3B59mX07lfpLJDv5NwAEx3wSs= -github.com/aws/aws-sdk-go-v2/service/sqs v1.8.0/go.mod h1:BXA1CVaEd9TBOQ8G2ke7lMWdVggAeh35+h2HDO50z7s= +github.com/aws/aws-sdk-go-v2/service/sqs v1.9.0 h1:g6EHC3RFpgbRR8/Yk6BTbzfPn+E3o6J3zWPrcjvVJTw= +github.com/aws/aws-sdk-go-v2/service/sqs v1.9.0/go.mod h1:BXA1CVaEd9TBOQ8G2ke7lMWdVggAeh35+h2HDO50z7s= github.com/aws/aws-sdk-go-v2/service/sso v1.4.0 h1:sHXMIKYS6YiLPzmKSvDpPmOpJDHxmAUgbiF49YNVztg= github.com/aws/aws-sdk-go-v2/service/sso v1.4.0/go.mod h1:+1fpWnL96DL23aXPpMGbsmKe8jLTEfbjuQoA4WS1VaA= github.com/aws/aws-sdk-go-v2/service/sts v1.7.0 h1:1at4e5P+lvHNl2nUktdM2/v+rpICg/QSEr9TO/uW9vU= @@ -92,6 +92,8 @@ github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQ github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -102,6 +104,7 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -109,15 +112,18 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/emicklei/proto v1.9.1 h1:MUgjFo5xlMwYv72TnF5xmmdKZ04u+dVbv6wdARv16D8= +github.com/emicklei/proto v1.9.1/go.mod h1:rn1FgRS/FANiZdD2djyH7TMA9jdRDcYQ9IEN9yvjX0A= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/fasthttp/websocket v1.4.3 h1:qjhRJ/rTy4KB8oBxljEC00SDt6HUY9jLRfM601SUdS4= -github.com/fasthttp/websocket v1.4.3/go.mod h1:5r4oKssgS7W6Zn6mPWap3NWzNPJNzUUh3baWTOhcYQk= +github.com/fasthttp/websocket v1.4.3-rc.8 h1:6P/+ejKdkLC9UhkY7GlShGWYMDBiWQtIECLBTDZ/2LU= +github.com/fasthttp/websocket v1.4.3-rc.8/go.mod h1:4m/MeZnTBQR2coy0HDUpyBXDkgtl2SxO+GZng0EKr6k= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= @@ -260,7 +266,6 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4= github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= @@ -280,8 +285,8 @@ github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/mattn/go-isatty v0.0.13 h1:qdl+GuBjcsKKDco5BsxPJlId98mSWNKqYA+Co0SC1yA= -github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -314,8 +319,9 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU= github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= -github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= +github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -350,9 +356,9 @@ github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891/go.mod h1:ogQD github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= -github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVzGOXq8RbEvHnhzA/A2sLZzgn0m6URjnukY8= -github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873 h1:N3Af8f13ooDKcIhsmFT7Z05CStZWu4C7Md0uDEy4q6o= github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873/go.mod h1:dmPawKuiAeG/aFYVs2i+Dyosoo7FNcm+Pi8iK6ZUrX8= +github.com/savsgio/gotils v0.0.0-20210907153846-c06938798b52 h1:FODZE/jDkENIpW3JiMA9sXBQfNklTfClUNhR9k37dPY= +github.com/savsgio/gotils v0.0.0-20210907153846-c06938798b52/go.mod h1:oejLrk1Y/5zOF+c/aHtXqn3TFlzzbAgPWg8zBiAHDas= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shirou/gopsutil v3.21.8+incompatible h1:sh0foI8tMRlCidUJR+KzqWYWxrkuuPIGiO6Vp+KXdCU= github.com/shirou/gopsutil v3.21.8+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= @@ -364,6 +370,7 @@ github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfIt github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= @@ -398,10 +405,8 @@ github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2bi github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.14.0/go.mod h1:ol1PCaL0dX20wC0htZ7sYCsvCYmrouYra0zHzaclZhE= github.com/valyala/fasthttp v1.29.0 h1:F5GKpytwFk5OhCuRh6H+d4vZAcEeNAwPTdwQnm6IERY= github.com/valyala/fasthttp v1.29.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus= -github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc= @@ -428,6 +433,7 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -527,8 +533,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210825183410-e898025ed96a h1:bRuuGXV8wwSdGTB+CtJf+FjgO1APK1CoO39T4BN/XBw= -golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f h1:w6wWR0H+nyVpbSAQbzVEIACVyr/h8l/BEkY6Sokc7Eg= +golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -614,8 +620,8 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e h1:XMgFehsDnnLGtjvjOfqWSUzt0alpTR1RSEuznObga2c= -golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34 h1:GkvMjFtXUmahfDtashnc1mnrCtuBVcwse5QV2lUk/tI= +golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -761,6 +767,7 @@ google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= +google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c h1:wtujag7C+4D6KMoulW9YauvK2lgdvCMS260jsqqBXr0= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -782,6 +789,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q= +google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -804,8 +813,9 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.38.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.63.0 h1:2t0h8NA59dpVQpa5Yh8cIcR6nHAeBIEk0zlLVqfw4N4= +gopkg.in/ini.v1 v1.63.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/pkg/events/grpc_event.go b/pkg/events/grpc_event.go new file mode 100644 index 00000000..31ff4957 --- /dev/null +++ b/pkg/events/grpc_event.go @@ -0,0 +1,39 @@ +package events + +import ( + "time" + + "google.golang.org/grpc" +) + +const ( + // EventUnaryCallOk represents success unary call response + EventUnaryCallOk G = iota + 13000 + + // EventUnaryCallErr raised when unary call ended with error + EventUnaryCallErr +) + +type G int64 + +func (ev G) String() string { + switch ev { + case EventUnaryCallOk: + return "EventUnaryCallOk" + case EventUnaryCallErr: + return "EventUnaryCallErr" + } + return UnknownEventType +} + +// JobEvent represent job event. +type GRPCEvent struct { + Event G + // Info contains unary call info. + Info *grpc.UnaryServerInfo + // Error associated with event. + Error error + // event timings + Start time.Time + Elapsed time.Duration +} diff --git a/plugins/grpc/codec/codec.go b/plugins/grpc/codec/codec.go new file mode 100644 index 00000000..a9d89ac5 --- /dev/null +++ b/plugins/grpc/codec/codec.go @@ -0,0 +1,44 @@ +package codec + +import "google.golang.org/grpc/encoding" + +type RawMessage []byte + +// By default, gRPC registers and uses the "proto" codec, so it is not necessary to do this in your own code to send and receive proto messages. +// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec +const cName string = "proto" +const rm string = "rawMessage" + +func (r RawMessage) Reset() {} +func (RawMessage) ProtoMessage() {} +func (RawMessage) String() string { return rm } + +type Codec struct{ base encoding.Codec } + +// Marshal returns the wire format of v. rawMessages would be returned without encoding. +func (c *Codec) Marshal(v interface{}) ([]byte, error) { + if raw, ok := v.(RawMessage); ok { + return raw, nil + } + + return c.base.Marshal(v) +} + +// Unmarshal parses the wire format into v. rawMessages would not be unmarshalled. +func (c *Codec) Unmarshal(data []byte, v interface{}) error { + if raw, ok := v.(*RawMessage); ok { + *raw = data + return nil + } + + return c.base.Unmarshal(data, v) +} + +func (c *Codec) Name() string { + return cName +} + +// String return codec name. +func (c *Codec) String() string { + return "raw:" + c.base.Name() +} diff --git a/plugins/grpc/codec/codec_test.go b/plugins/grpc/codec/codec_test.go new file mode 100644 index 00000000..60efb072 --- /dev/null +++ b/plugins/grpc/codec/codec_test.go @@ -0,0 +1,79 @@ +package codec + +import ( + "testing" + + json "github.com/json-iterator/go" + "github.com/stretchr/testify/assert" +) + +type jsonCodec struct{} + +func (jsonCodec) Marshal(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (jsonCodec) Unmarshal(data []byte, v interface{}) error { + return json.Unmarshal(data, v) +} + +func (jsonCodec) Name() string { + return "json" +} + +func TestCodec_String(t *testing.T) { + c := Codec{jsonCodec{}} + + assert.Equal(t, "raw:json", c.String()) + + r := RawMessage{} + r.Reset() + r.ProtoMessage() + assert.Equal(t, "rawMessage", r.String()) +} + +func TestCodec_Unmarshal_ByPass(t *testing.T) { + c := Codec{jsonCodec{}} + + s := struct { + Name string + }{} + + assert.NoError(t, c.Unmarshal([]byte(`{"name":"name"}`), &s)) + assert.Equal(t, "name", s.Name) +} + +func TestCodec_Marshal_ByPass(t *testing.T) { + c := Codec{jsonCodec{}} + + s := struct { + Name string + }{ + Name: "name", + } + + d, err := c.Marshal(s) + assert.NoError(t, err) + + assert.Equal(t, `{"Name":"name"}`, string(d)) +} + +func TestCodec_Unmarshal_Raw(t *testing.T) { + c := Codec{jsonCodec{}} + + s := RawMessage{} + + assert.NoError(t, c.Unmarshal([]byte(`{"name":"name"}`), &s)) + assert.Equal(t, `{"name":"name"}`, string(s)) +} + +func TestCodec_Marshal_Raw(t *testing.T) { + c := Codec{jsonCodec{}} + + s := RawMessage(`{"Name":"name"}`) + + d, err := c.Marshal(s) + assert.NoError(t, err) + + assert.Equal(t, `{"Name":"name"}`, string(d)) +} diff --git a/plugins/grpc/config.go b/plugins/grpc/config.go new file mode 100644 index 00000000..fedd4998 --- /dev/null +++ b/plugins/grpc/config.go @@ -0,0 +1,128 @@ +package grpc + +import ( + "math" + "os" + "strings" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pool" +) + +type Config struct { + Listen string `mapstructure:"listen"` + Proto string `mapstructure:"proto"` + + TLS *TLS + + // Env is environment variables passed to the http pool + Env map[string]string `mapstructure:"env"` + + GrpcPool *pool.Config `mapstructure:"pool"` + MaxSendMsgSize int64 `mapstructure:"max_send_msg_size"` + MaxRecvMsgSize int64 `mapstructure:"max_recv_msg_size"` + MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle"` + MaxConnectionAge time.Duration `mapstructure:"max_connection_age"` + MaxConnectionAgeGrace time.Duration `mapstructure:"max_connection_age_grace"` + MaxConcurrentStreams int64 `mapstructure:"max_concurrent_streams"` + PingTime time.Duration `mapstructure:"ping_time"` + Timeout time.Duration `mapstructure:"timeout"` +} + +type TLS struct { + Key string + Cert string + RootCA string +} + +func (c *Config) InitDefaults() error { //nolint:gocognit + const op = errors.Op("grpc_plugin_config") + if c.GrpcPool == nil { + c.GrpcPool = &pool.Config{} + } + + c.GrpcPool.InitDefaults() + + if !strings.Contains(c.Listen, ":") { + return errors.E(op, errors.Errorf("mailformed grpc grpc address, provided: %s", c.Listen)) + } + + if c.EnableTLS() { + if _, err := os.Stat(c.TLS.Key); err != nil { + if os.IsNotExist(err) { + return errors.E(op, errors.Errorf("key file '%s' does not exists", c.TLS.Key)) + } + + return errors.E(op, err) + } + + if _, err := os.Stat(c.TLS.Cert); err != nil { + if os.IsNotExist(err) { + return errors.E(op, errors.Errorf("cert file '%s' does not exists", c.TLS.Cert)) + } + + return errors.E(op, err) + } + + // RootCA is optional, but if provided - check it + if c.TLS.RootCA != "" { + if _, err := os.Stat(c.TLS.RootCA); err != nil { + if os.IsNotExist(err) { + return errors.E(op, errors.Errorf("root ca path provided, but key file '%s' does not exists", c.TLS.RootCA)) + } + return errors.E(op, err) + } + } + } + + // used to set max time + infinity := time.Duration(math.MaxInt64) + + if c.PingTime == 0 { + c.PingTime = time.Hour * 2 + } + + if c.Timeout == 0 { + c.Timeout = time.Second * 20 + } + + if c.MaxConcurrentStreams == 0 { + c.MaxConcurrentStreams = 10 + } + // set default + if c.MaxConnectionAge == 0 { + c.MaxConnectionAge = infinity + } + + // set default + if c.MaxConnectionIdle == 0 { + c.MaxConnectionIdle = infinity + } + + if c.MaxConnectionAgeGrace == 0 { + c.MaxConnectionAgeGrace = infinity + } + + if c.MaxRecvMsgSize == 0 { + c.MaxRecvMsgSize = 1024 * 1024 * 50 + } else { + c.MaxRecvMsgSize = 1024 * 1024 * c.MaxRecvMsgSize + } + + if c.MaxSendMsgSize == 0 { + c.MaxSendMsgSize = 1024 * 1024 * 50 + } else { + c.MaxSendMsgSize = 1024 * 1024 * c.MaxSendMsgSize + } + + return nil +} + +func (c *Config) EnableTLS() bool { + if c.TLS != nil { + return (c.TLS.RootCA != "" && c.TLS.Key != "" && c.TLS.Cert != "") || (c.TLS.Key != "" && c.TLS.Cert != "") + } + + return false +} diff --git a/plugins/grpc/parser/message.proto b/plugins/grpc/parser/message.proto new file mode 100644 index 00000000..a4012010 --- /dev/null +++ b/plugins/grpc/parser/message.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package app.namespace; + +message Message { + string msg = 1; + int64 value = 2; +}
\ No newline at end of file diff --git a/plugins/grpc/parser/parse.go b/plugins/grpc/parser/parse.go new file mode 100644 index 00000000..d59b0927 --- /dev/null +++ b/plugins/grpc/parser/parse.go @@ -0,0 +1,114 @@ +package parser + +import ( + "bytes" + "io" + "os" + + pp "github.com/emicklei/proto" +) + +// Service contains information about singular GRPC service. +type Service struct { + // Package defines service namespace. + Package string + + // Name defines service name. + Name string + + // Methods list. + Methods []Method +} + +// Method describes singular RPC method. +type Method struct { + // Name is method name. + Name string + + // StreamsRequest defines if method accept stream input. + StreamsRequest bool + + // RequestType defines message name (from the same package) of method input. + RequestType string + + // StreamsReturns defines if method streams result. + StreamsReturns bool + + // ReturnsType defines message name (from the same package) of method return value. + ReturnsType string +} + +// File parses given proto file or returns error. +func File(file string, importPath string) ([]Service, error) { + reader, _ := os.Open(file) + defer reader.Close() + + return parse(reader, importPath) +} + +// Bytes parses string into proto definition. +func Bytes(data []byte) ([]Service, error) { + return parse(bytes.NewBuffer(data), "") +} + +func parse(reader io.Reader, importPath string) ([]Service, error) { + proto, err := pp.NewParser(reader).Parse() + if err != nil { + return nil, err + } + + return parseServices( + proto, + parsePackage(proto), + importPath, + ) +} + +func parsePackage(proto *pp.Proto) string { + for _, e := range proto.Elements { + if p, ok := e.(*pp.Package); ok { + return p.Name + } + } + + return "" +} + +func parseServices(proto *pp.Proto, pkg string, importPath string) ([]Service, error) { + services := make([]Service, 0) + + pp.Walk(proto, pp.WithService(func(service *pp.Service) { + services = append(services, Service{ + Package: pkg, + Name: service.Name, + Methods: parseMethods(service), + }) + })) + + pp.Walk(proto, func(v pp.Visitee) { + if i, ok := v.(*pp.Import); ok { + if im, err := File(importPath+"/"+i.Filename, importPath); err == nil { + services = append(services, im...) + } + } + }) + + return services, nil +} + +func parseMethods(s *pp.Service) []Method { + methods := make([]Method, 0) + for _, e := range s.Elements { + if m, ok := e.(*pp.RPC); ok { + methods = append(methods, Method{ + Name: m.Name, + StreamsRequest: m.StreamsRequest, + RequestType: m.RequestType, + StreamsReturns: m.StreamsReturns, + ReturnsType: m.ReturnsType, + }) + } + } + + return methods +} diff --git a/plugins/grpc/parser/parse_test.go b/plugins/grpc/parser/parse_test.go new file mode 100644 index 00000000..b71c133d --- /dev/null +++ b/plugins/grpc/parser/parse_test.go @@ -0,0 +1,71 @@ +package parser + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseFile(t *testing.T) { + services, err := File("test.proto", "") + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} + +func TestParseFileWithImportsNestedFolder(t *testing.T) { + services, err := File("./test_nested/test_import.proto", "./test_nested") + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} + +func TestParseFileWithImports(t *testing.T) { + services, err := File("test_import.proto", ".") + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} + +func TestParseNotFound(t *testing.T) { + _, err := File("test2.proto", "") + assert.Error(t, err) +} + +func TestParseBytes(t *testing.T) { + services, err := Bytes([]byte{}) + assert.NoError(t, err) + assert.Len(t, services, 0) +} + +func TestParseString(t *testing.T) { + services, err := Bytes([]byte(` +syntax = "proto3"; +package app.namespace; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +} + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +} + +message Message { + string msg = 1; + int64 value = 2; +} +`)) + assert.NoError(t, err) + assert.Len(t, services, 2) + + assert.Equal(t, "app.namespace", services[0].Package) +} diff --git a/plugins/grpc/parser/pong.proto b/plugins/grpc/parser/pong.proto new file mode 100644 index 00000000..9756fabe --- /dev/null +++ b/plugins/grpc/parser/pong.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test.proto b/plugins/grpc/parser/test.proto new file mode 100644 index 00000000..e2230954 --- /dev/null +++ b/plugins/grpc/parser/test.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; +package app.namespace; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +} + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +} + +message Message { + string msg = 1; + int64 value = 2; +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_import.proto b/plugins/grpc/parser/test_import.proto new file mode 100644 index 00000000..1b954fc1 --- /dev/null +++ b/plugins/grpc/parser/test_import.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; +import "pong.proto"; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/message.proto b/plugins/grpc/parser/test_nested/message.proto new file mode 100644 index 00000000..a4012010 --- /dev/null +++ b/plugins/grpc/parser/test_nested/message.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package app.namespace; + +message Message { + string msg = 1; + int64 value = 2; +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/pong.proto b/plugins/grpc/parser/test_nested/pong.proto new file mode 100644 index 00000000..9756fabe --- /dev/null +++ b/plugins/grpc/parser/test_nested/pong.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; + +// Pong service. +service PongService { + rpc Pong (stream Message) returns (stream Message) { + } +}
\ No newline at end of file diff --git a/plugins/grpc/parser/test_nested/test_import.proto b/plugins/grpc/parser/test_nested/test_import.proto new file mode 100644 index 00000000..a3a476ba --- /dev/null +++ b/plugins/grpc/parser/test_nested/test_import.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package app.namespace; + +import "message.proto"; +import "pong.proto"; + +// Ping Service. +service PingService { + // Ping Method. + rpc Ping (Message) returns (Message) { + } +} diff --git a/plugins/grpc/plugin.go b/plugins/grpc/plugin.go new file mode 100644 index 00000000..7518d352 --- /dev/null +++ b/plugins/grpc/plugin.go @@ -0,0 +1,195 @@ +package grpc + +import ( + "context" + "sync" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/state/process" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/grpc/codec" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding" +) + +const ( + name string = "grpc" + RrGrpc string = "RR_GRPC" +) + +type Plugin struct { + mu *sync.RWMutex + config *Config + gPool pool.Pool + opts []grpc.ServerOption + services []func(server *grpc.Server) + server *grpc.Server + rrServer server.Server + + // events handler + events events.Handler + log logger.Logger +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { + const op = errors.Op("grpc_plugin_init") + + if !cfg.Has(name) { + return errors.E(errors.Disabled) + } + // register the codec + encoding.RegisterCodec(&codec.Codec{}) + + err := cfg.UnmarshalKey(name, &p.config) + if err != nil { + return errors.E(op, err) + } + + err = p.config.InitDefaults() + if err != nil { + return errors.E(op, err) + } + + p.opts = make([]grpc.ServerOption, 0) + p.services = make([]func(server *grpc.Server), 0) + p.events = events.NewEventsHandler() + p.events.AddListener(p.collectGRPCEvents) + p.rrServer = server + + // worker's GRPC mode + if p.config.Env == nil { + p.config.Env = make(map[string]string) + } + p.config.Env[RrGrpc] = "true" + + p.log = log + p.mu = &sync.RWMutex{} + + return nil +} + +func (p *Plugin) Serve() chan error { + const op = errors.Op("grpc_plugin_serve") + errCh := make(chan error, 1) + + var err error + p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{ + Debug: p.config.GrpcPool.Debug, + NumWorkers: p.config.GrpcPool.NumWorkers, + MaxJobs: p.config.GrpcPool.MaxJobs, + AllocateTimeout: p.config.GrpcPool.AllocateTimeout, + DestroyTimeout: p.config.GrpcPool.DestroyTimeout, + Supervisor: p.config.GrpcPool.Supervisor, + }, p.config.Env, p.collectGRPCEvents) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + go func() { + var err error + p.mu.Lock() + p.server, err = p.createGRPCserver() + if err != nil { + p.log.Error("create grpc server", "error", err) + errCh <- errors.E(op, err) + return + } + + l, err := utils.CreateListener(p.config.Listen) + if err != nil { + p.log.Error("create grpc listener", "error", err) + errCh <- errors.E(op, err) + } + + // protect serve + p.mu.Unlock() + err = p.server.Serve(l) + if err != nil { + // skip errors when stopping the server + if err == grpc.ErrServerStopped { + return + } + + p.log.Error("grpc server stopped", "error", err) + errCh <- errors.E(op, err) + return + } + }() + + return errCh +} + +func (p *Plugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.server != nil { + p.server.Stop() + } + return nil +} + +func (p *Plugin) Available() {} + +func (p *Plugin) Name() string { + return name +} + +func (p *Plugin) Reset() error { + p.mu.Lock() + defer p.mu.Unlock() + const op = errors.Op("grpc_plugin_reset") + + // destroy old pool + p.gPool.Destroy(context.Background()) + + var err error + p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{ + Debug: p.config.GrpcPool.Debug, + NumWorkers: p.config.GrpcPool.NumWorkers, + MaxJobs: p.config.GrpcPool.MaxJobs, + AllocateTimeout: p.config.GrpcPool.AllocateTimeout, + DestroyTimeout: p.config.GrpcPool.DestroyTimeout, + Supervisor: p.config.GrpcPool.Supervisor, + }, p.config.Env, p.collectGRPCEvents) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (p *Plugin) Workers() []*process.State { + p.mu.RLock() + defer p.mu.RUnlock() + + workers := p.gPool.Workers() + + ps := make([]*process.State, 0, len(workers)) + for i := 0; i < len(workers); i++ { + state, err := process.WorkerProcessState(workers[i]) + if err != nil { + return nil + } + ps = append(ps, state) + } + + return ps +} + +func (p *Plugin) collectGRPCEvents(event interface{}) { + if gev, ok := event.(events.GRPCEvent); ok { + switch gev.Event { + case events.EventUnaryCallOk: + p.log.Info("method called", "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed) + case events.EventUnaryCallErr: + p.log.Info("method call finished with error", "error", gev.Error, "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed) + } + } +} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go new file mode 100644 index 00000000..0894a7a8 --- /dev/null +++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/main.go @@ -0,0 +1,68 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package main + +import ( + "io" + "io/ioutil" + "os" + + "github.com/spiral/roadrunner/v2/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php" + "google.golang.org/protobuf/proto" + plugin "google.golang.org/protobuf/types/pluginpb" +) + +func main() { + req, err := readRequest(os.Stdin) + if err != nil { + panic(err) + } + + if err = writeResponse(os.Stdout, php.Generate(req)); err != nil { + panic(err) + } +} + +func readRequest(in io.Reader) (*plugin.CodeGeneratorRequest, error) { + data, err := ioutil.ReadAll(in) + if err != nil { + return nil, err + } + + req := new(plugin.CodeGeneratorRequest) + if err = proto.Unmarshal(data, req); err != nil { + return nil, err + } + + return req, nil +} + +func writeResponse(out io.Writer, resp *plugin.CodeGeneratorResponse) error { + data, err := proto.Marshal(resp) + if err != nil { + return err + } + + _, err = out.Write(data) + return err +} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go new file mode 100644 index 00000000..03c48ac8 --- /dev/null +++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/generate.go @@ -0,0 +1,57 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package php + +import ( + desc "google.golang.org/protobuf/types/descriptorpb" + plugin "google.golang.org/protobuf/types/pluginpb" +) + +// Generate generates needed service classes +func Generate(req *plugin.CodeGeneratorRequest) *plugin.CodeGeneratorResponse { + resp := &plugin.CodeGeneratorResponse{} + + for _, file := range req.ProtoFile { + for _, service := range file.Service { + resp.File = append(resp.File, generate(req, file, service)) + } + } + + return resp +} + +func generate( + req *plugin.CodeGeneratorRequest, + file *desc.FileDescriptorProto, + service *desc.ServiceDescriptorProto, +) *plugin.CodeGeneratorResponse_File { + return &plugin.CodeGeneratorResponse_File{ + Name: str(filename(file, service.Name)), + Content: str(body(req, file, service)), + } +} + +// helper to convert string into string pointer +func str(str string) *string { + return &str +} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go new file mode 100644 index 00000000..32579e33 --- /dev/null +++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/keywords.go @@ -0,0 +1,139 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package php + +import ( + "bytes" + "strings" + "unicode" +) + +// @see https://github.com/protocolbuffers/protobuf/blob/master/php/ext/google/protobuf/protobuf.c#L168 +var reservedKeywords = []string{ + "abstract", "and", "array", "as", "break", + "callable", "case", "catch", "class", "clone", + "const", "continue", "declare", "default", "die", + "do", "echo", "else", "elseif", "empty", + "enddeclare", "endfor", "endforeach", "endif", "endswitch", + "endwhile", "eval", "exit", "extends", "final", + "for", "foreach", "function", "global", "goto", + "if", "implements", "include", "include_once", "instanceof", + "insteadof", "interface", "isset", "list", "namespace", + "new", "or", "print", "private", "protected", + "public", "require", "require_once", "return", "static", + "switch", "throw", "trait", "try", "unset", + "use", "var", "while", "xor", "int", + "float", "bool", "string", "true", "false", + "null", "void", "iterable", +} + +// Check if given name/keyword is reserved by php. +func isReserved(name string) bool { + name = strings.ToLower(name) + for _, k := range reservedKeywords { + if name == k { + return true + } + } + + return false +} + +// generate php namespace or path +func namespace(pkg *string, sep string) string { + if pkg == nil { + return "" + } + + result := bytes.NewBuffer(nil) + for _, p := range strings.Split(*pkg, ".") { + result.WriteString(identifier(p, "")) + result.WriteString(sep) + } + + return strings.Trim(result.String(), sep) +} + +// create php identifier for class or message +func identifier(name string, suffix string) string { + name = Camelize(name) + if suffix != "" { + return name + Camelize(suffix) + } + + return name +} + +func resolveReserved(identifier string, pkg string) string { + if isReserved(strings.ToLower(identifier)) { + if pkg == ".google.protobuf" { + return "GPB" + identifier + } + return "PB" + identifier + } + + return identifier +} + +// Camelize "dino_party" -> "DinoParty" +func Camelize(word string) string { + words := splitAtCaseChangeWithTitlecase(word) + return strings.Join(words, "") +} + +func splitAtCaseChangeWithTitlecase(s string) []string { + words := make([]string, 0) + word := make([]rune, 0) + for _, c := range s { + spacer := isSpacerChar(c) + if len(word) > 0 { + if unicode.IsUpper(c) || spacer { + words = append(words, string(word)) + word = make([]rune, 0) + } + } + if !spacer { + if len(word) > 0 { + word = append(word, unicode.ToLower(c)) + } else { + word = append(word, unicode.ToUpper(c)) + } + } + } + words = append(words, string(word)) + return words +} + +func isSpacerChar(c rune) bool { + switch { + case c == rune("_"[0]): + return true + case c == rune(" "[0]): + return true + case c == rune(":"[0]): + return true + case c == rune("-"[0]): + return true + } + return false +} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go new file mode 100644 index 00000000..c1dc3898 --- /dev/null +++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/ns.go @@ -0,0 +1,103 @@ +package php + +import ( + "bytes" + "fmt" + "strings" + + desc "google.golang.org/protobuf/types/descriptorpb" + plugin "google.golang.org/protobuf/types/pluginpb" +) + +// manages internal name representation of the package +type ns struct { + // Package defines file package. + Package string + + // Root namespace of the package + Namespace string + + // Import declares what namespaces to be imported + Import map[string]string +} + +// newNamespace creates new work namespace. +func newNamespace(req *plugin.CodeGeneratorRequest, file *desc.FileDescriptorProto, service *desc.ServiceDescriptorProto) *ns { + ns := &ns{ + Package: *file.Package, + Namespace: namespace(file.Package, "\\"), + Import: make(map[string]string), + } + + if file.Options != nil && file.Options.PhpNamespace != nil { + ns.Namespace = *file.Options.PhpNamespace + } + + for k := range service.Method { + ns.importMessage(req, service.Method[k].InputType) + ns.importMessage(req, service.Method[k].OutputType) + } + + return ns +} + +// importMessage registers new import message namespace (only the namespace). +func (ns *ns) importMessage(req *plugin.CodeGeneratorRequest, msg *string) { + if msg == nil { + return + } + + chunks := strings.Split(*msg, ".") + pkg := strings.Join(chunks[:len(chunks)-1], ".") + + result := bytes.NewBuffer(nil) + for _, p := range chunks[:len(chunks)-1] { + result.WriteString(identifier(p, "")) + result.WriteString(`\`) + } + + if pkg == "."+ns.Package { + // root package + return + } + + for _, f := range req.ProtoFile { + if pkg == "."+*f.Package { + if f.Options != nil && f.Options.PhpNamespace != nil { + // custom imported namespace + ns.Import[pkg] = *f.Options.PhpNamespace + return + } + } + } + + ns.Import[pkg] = strings.Trim(result.String(), `\`) +} + +// resolve message alias +func (ns *ns) resolve(msg *string) string { + chunks := strings.Split(*msg, ".") + pkg := strings.Join(chunks[:len(chunks)-1], ".") + + if pkg == "."+ns.Package { + // root message + return identifier(chunks[len(chunks)-1], "") + } + + for iPkg, ns := range ns.Import { + if pkg == iPkg { + // use last namespace chunk + nsChunks := strings.Split(ns, `\`) + identifier := identifier(chunks[len(chunks)-1], "") + + return fmt.Sprintf( + `%s\%s`, + nsChunks[len(nsChunks)-1], + resolveReserved(identifier, pkg), + ) + } + } + + // fully clarified name (fallback) + return "\\" + namespace(msg, "\\") +} diff --git a/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go new file mode 100644 index 00000000..e00c6fdd --- /dev/null +++ b/plugins/grpc/protoc_plugins/protoc-gen-php-grpc/php/template.go @@ -0,0 +1,103 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package php + +import ( + "bytes" + "fmt" + "strings" + "text/template" + + desc "google.golang.org/protobuf/types/descriptorpb" + plugin "google.golang.org/protobuf/types/pluginpb" +) + +const phpBody = `<?php +# Generated by the protocol buffer compiler (spiral/php-grpc). DO NOT EDIT! +# source: {{ .File.Name }} +{{ $ns := .Namespace -}} +{{if $ns.Namespace}} +namespace {{ $ns.Namespace }}; +{{end}} +use Spiral\GRPC; +{{- range $n := $ns.Import}} +use {{ $n }}; +{{- end}} + +interface {{ .Service.Name | interface }} extends GRPC\ServiceInterface +{ + // GRPC specific service name. + public const NAME = "{{ .File.Package }}.{{ .Service.Name }}";{{ "\n" }} +{{- range $m := .Service.Method}} + /** + * @param GRPC\ContextInterface $ctx + * @param {{ name $ns $m.InputType }} $in + * @return {{ name $ns $m.OutputType }} + * + * @throws GRPC\Exception\InvokeException + */ + public function {{ $m.Name }}(GRPC\ContextInterface $ctx, {{ name $ns $m.InputType }} $in): {{ name $ns $m.OutputType }}; +{{end -}} +} +` + +// generate php filename +func filename(file *desc.FileDescriptorProto, name *string) string { + ns := namespace(file.Package, "/") + if file.Options != nil && file.Options.PhpNamespace != nil { + ns = strings.ReplaceAll(*file.Options.PhpNamespace, `\`, `/`) + } + + return fmt.Sprintf("%s/%s.php", ns, identifier(*name, "interface")) +} + +// generate php file body +func body(req *plugin.CodeGeneratorRequest, file *desc.FileDescriptorProto, service *desc.ServiceDescriptorProto) string { + out := bytes.NewBuffer(nil) + + data := struct { + Namespace *ns + File *desc.FileDescriptorProto + Service *desc.ServiceDescriptorProto + }{ + Namespace: newNamespace(req, file, service), + File: file, + Service: service, + } + + tpl := template.Must(template.New("phpBody").Funcs(template.FuncMap{ + "interface": func(name *string) string { + return identifier(*name, "interface") + }, + "name": func(ns *ns, name *string) string { + return ns.resolve(name) + }, + }).Parse(phpBody)) + + err := tpl.Execute(out, data) + if err != nil { + panic(err) + } + + return out.String() +} diff --git a/plugins/grpc/proxy/proxy.go b/plugins/grpc/proxy/proxy.go new file mode 100644 index 00000000..074aac85 --- /dev/null +++ b/plugins/grpc/proxy/proxy.go @@ -0,0 +1,219 @@ +package proxy + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/grpc/codec" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" +) + +const ( + peerAddr string = ":peer.address" + peerAuthType string = ":peer.auth-type" + delimiter string = "|:|" +) + +// base interface for Proxy class +type proxyService interface { + // RegisterMethod registers new RPC method. + RegisterMethod(method string) + + // ServiceDesc returns service description for the proxy. + ServiceDesc() *grpc.ServiceDesc +} + +// carry details about service, method and RPC context to PHP process +type rpcContext struct { + Service string `json:"service"` + Method string `json:"method"` + Context map[string][]string `json:"context"` +} + +// Proxy manages GRPC/RoadRunner bridge. +type Proxy struct { + mu *sync.RWMutex + grpcPool pool.Pool + name string + metadata string + methods []string +} + +// NewProxy creates new service proxy object. +func NewProxy(name string, metadata string, grpcPool pool.Pool, mu *sync.RWMutex) *Proxy { + return &Proxy{ + mu: mu, + grpcPool: grpcPool, + name: name, + metadata: metadata, + methods: make([]string, 0), + } +} + +// RegisterMethod registers new RPC method. +func (p *Proxy) RegisterMethod(method string) { + p.methods = append(p.methods, method) +} + +// ServiceDesc returns service description for the proxy. +func (p *Proxy) ServiceDesc() *grpc.ServiceDesc { + desc := &grpc.ServiceDesc{ + ServiceName: p.name, + Metadata: p.metadata, + HandlerType: (*proxyService)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{}, + } + + // Registering methods + for _, m := range p.methods { + desc.Methods = append(desc.Methods, grpc.MethodDesc{ + MethodName: m, + Handler: p.methodHandler(m), + }) + } + + return desc +} + +// Generate method handler proxy. +func (p *Proxy) methodHandler(method string) func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + return func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := codec.RawMessage{} + if err := dec(&in); err != nil { + return nil, wrapError(err) + } + + if interceptor == nil { + return p.invoke(ctx, method, in) + } + + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: fmt.Sprintf("/%s/%s", p.name, method), + } + + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return p.invoke(ctx, method, req.(codec.RawMessage)) + } + + return interceptor(ctx, in, info, handler) + } +} + +func (p *Proxy) invoke(ctx context.Context, method string, in codec.RawMessage) (interface{}, error) { + payload, err := p.makePayload(ctx, method, in) + if err != nil { + return nil, err + } + + p.mu.RLock() + resp, err := p.grpcPool.Exec(payload) + p.mu.RUnlock() + + if err != nil { + return nil, wrapError(err) + } + + md, err := p.responseMetadata(resp) + if err != nil { + return nil, err + } + ctx = metadata.NewIncomingContext(ctx, md) + err = grpc.SetHeader(ctx, md) + if err != nil { + return nil, err + } + + return codec.RawMessage(resp.Body), nil +} + +// responseMetadata extracts metadata from roadrunner response Payload.Context and converts it to metadata.MD +func (p *Proxy) responseMetadata(resp *payload.Payload) (metadata.MD, error) { + var md metadata.MD + if resp == nil || len(resp.Context) == 0 { + return md, nil + } + + var rpcMetadata map[string]string + err := json.Unmarshal(resp.Context, &rpcMetadata) + if err != nil { + return md, err + } + + if len(rpcMetadata) > 0 { + md = metadata.New(rpcMetadata) + } + + return md, nil +} + +// makePayload generates RoadRunner compatible payload based on GRPC message. todo: return error +func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMessage) (*payload.Payload, error) { + ctxMD := make(map[string][]string) + + if md, ok := metadata.FromIncomingContext(ctx); ok { + for k, v := range md { + ctxMD[k] = v + } + } + + if pr, ok := peer.FromContext(ctx); ok { + ctxMD[peerAddr] = []string{pr.Addr.String()} + if pr.AuthInfo != nil { + ctxMD[peerAuthType] = []string{pr.AuthInfo.AuthType()} + } + } + + ctxData, err := json.Marshal(rpcContext{Service: p.name, Method: method, Context: ctxMD}) + + if err != nil { + return nil, err + } + + return &payload.Payload{Context: ctxData, Body: body}, nil +} + +// mounts proper error code for the error +func wrapError(err error) error { + // internal agreement + if strings.Contains(err.Error(), delimiter) { + chunks := strings.Split(err.Error(), delimiter) + code := codes.Internal + + // protect the slice access + if len(chunks) < 2 { + return err + } + + if phpCode, errConv := strconv.ParseUint(chunks[0], 10, 32); errConv == nil { + code = codes.Code(phpCode) + } + + st := status.New(code, chunks[1]).Proto() + + for _, detailsMessage := range chunks[2:] { + anyDetailsMessage := anypb.Any{} + errP := proto.Unmarshal([]byte(detailsMessage), &anyDetailsMessage) + if errP == nil { + st.Details = append(st.Details, &anyDetailsMessage) + } + } + + return status.ErrorProto(st) + } + + return status.Error(codes.Internal, err.Error()) +} diff --git a/plugins/grpc/proxy/proxy_test.go b/plugins/grpc/proxy/proxy_test.go new file mode 100644 index 00000000..2c024ee3 --- /dev/null +++ b/plugins/grpc/proxy/proxy_test.go @@ -0,0 +1,134 @@ +package proxy + +// import ( +// "testing" +// "time" + +// "github.com/sirupsen/logrus" +// "github.com/sirupsen/logrus/hooks/test" +// "github.com/stretchr/testify/assert" +// "golang.org/x/net/context" +// "google.golang.org/grpc" +// "google.golang.org/grpc/codes" +// "google.golang.org/grpc/metadata" +// "google.golang.org/grpc/status" +// ) + +// const addr = "localhost:9080" + +// func Test_Proxy_Error(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) + +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) + +// assert.NoError(t, c.Init(&testCfg{ +// grpcCfg: `{ +// "listen": "tcp://:9080", +// "tls": { +// "key": "tests/server.key", +// "cert": "tests/server.crt" +// }, +// "proto": "tests/test.proto", +// "workers":{ +// "command": "php tests/worker.php", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10, +// "destroyTimeout": 10 +// } +// } +// }`, +// })) + +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) + +// // should do nothing +// s.(*Service).Stop() + +// go func() { assert.NoError(t, c.Serve()) }() +// time.Sleep(time.Millisecond * 100) +// defer c.Stop() + +// cl, cn := getClient(addr) +// defer cn.Close() + +// _, err := cl.Throw(context.Background(), &tests.Message{Msg: "notFound"}) + +// assert.Error(t, err) +// se, _ := status.FromError(err) +// assert.Equal(t, "nothing here", se.Message()) +// assert.Equal(t, codes.NotFound, se.Code()) + +// _, errWithDetails := cl.Throw(context.Background(), &tests.Message{Msg: "withDetails"}) + +// assert.Error(t, errWithDetails) +// statusWithDetails, _ := status.FromError(errWithDetails) +// assert.Equal(t, "main exception message", statusWithDetails.Message()) +// assert.Equal(t, codes.InvalidArgument, statusWithDetails.Code()) + +// details := statusWithDetails.Details() + +// detailsMessageForException := details[0].(*tests.DetailsMessageForException) + +// assert.Equal(t, detailsMessageForException.Code, uint64(1)) +// assert.Equal(t, detailsMessageForException.Message, "details message") +// } + +// func Test_Proxy_Metadata(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) + +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) + +// assert.NoError(t, c.Init(&testCfg{ +// grpcCfg: `{ +// "listen": "tcp://:9080", +// "tls": { +// "key": "tests/server.key", +// "cert": "tests/server.crt" +// }, +// "proto": "tests/test.proto", +// "workers":{ +// "command": "php tests/worker.php", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10, +// "destroyTimeout": 10 +// } +// } +// }`, +// })) + +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) + +// // should do nothing +// s.(*Service).Stop() + +// go func() { assert.NoError(t, c.Serve()) }() +// time.Sleep(time.Millisecond * 100) +// defer c.Stop() + +// cl, cn := getClient(addr) +// defer cn.Close() + +// ctx := metadata.AppendToOutgoingContext(context.Background(), "key", "proxy-value") +// var header metadata.MD +// out, err := cl.Info( +// ctx, +// &tests.Message{Msg: "MD"}, +// grpc.Header(&header), +// grpc.WaitForReady(true), +// ) +// assert.Equal(t, []string{"bar"}, header.Get("foo")) +// assert.NoError(t, err) +// assert.Equal(t, `["proxy-value"]`, out.Msg) +// } diff --git a/plugins/grpc/server.go b/plugins/grpc/server.go new file mode 100644 index 00000000..323f73a0 --- /dev/null +++ b/plugins/grpc/server.go @@ -0,0 +1,154 @@ +package grpc + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "path" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/grpc/parser" + "github.com/spiral/roadrunner/v2/plugins/grpc/proxy" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" +) + +func (p *Plugin) createGRPCserver() (*grpc.Server, error) { + const op = errors.Op("grpc_plugin_create_server") + opts, err := p.serverOptions() + if err != nil { + return nil, errors.E(op, err) + } + + server := grpc.NewServer(opts...) + + if p.config.Proto != "" { + // php proxy services + services, err := parser.File(p.config.Proto, path.Dir(p.config.Proto)) + if err != nil { + return nil, err + } + + for _, service := range services { + p := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto, p.gPool, p.mu) + for _, m := range service.Methods { + p.RegisterMethod(m.Name) + } + + server.RegisterService(p.ServiceDesc(), p) + } + } + + // external and native services + for _, r := range p.services { + r(server) + } + + return server, nil +} + +func (p *Plugin) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + start := time.Now() + resp, err := handler(ctx, req) + if err != nil { + p.events.Push(events.GRPCEvent{ + Event: events.EventUnaryCallErr, + Info: info, + Error: err, + Start: start, + Elapsed: time.Since(start), + }) + + return nil, err + } + + p.events.Push(events.GRPCEvent{ + Event: events.EventUnaryCallOk, + Info: info, + Start: start, + Elapsed: time.Since(start), + }) + + return resp, nil +} + +func (p *Plugin) serverOptions() ([]grpc.ServerOption, error) { + const op = errors.Op("grpc_plugin_server_options") + + var tcreds credentials.TransportCredentials + var opts []grpc.ServerOption + var cert tls.Certificate + var certPool *x509.CertPool + var rca []byte + var err error + + if p.config.EnableTLS() { + // if client CA is not empty we combine it with Cert and Key + if p.config.TLS.RootCA != "" { + cert, err = tls.LoadX509KeyPair(p.config.TLS.Cert, p.config.TLS.Key) + if err != nil { + return nil, err + } + + certPool, err = x509.SystemCertPool() + if err != nil { + return nil, err + } + if certPool == nil { + certPool = x509.NewCertPool() + } + + rca, err = os.ReadFile(p.config.TLS.RootCA) + if err != nil { + return nil, err + } + + if ok := certPool.AppendCertsFromPEM(rca); !ok { + return nil, errors.E(op, errors.Str("could not append Certs from PEM")) + } + + tcreds = credentials.NewTLS(&tls.Config{ + MinVersion: tls.VersionTLS12, + ClientAuth: tls.RequireAndVerifyClientCert, + Certificates: []tls.Certificate{cert}, + ClientCAs: certPool, + }) + } else { + tcreds, err = credentials.NewServerTLSFromFile(p.config.TLS.Cert, p.config.TLS.Key) + if err != nil { + return nil, err + } + } + + serverOptions := []grpc.ServerOption{ + grpc.MaxSendMsgSize(int(p.config.MaxSendMsgSize)), + grpc.MaxRecvMsgSize(int(p.config.MaxRecvMsgSize)), + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: p.config.MaxConnectionIdle, + MaxConnectionAge: p.config.MaxConnectionAge, + MaxConnectionAgeGrace: p.config.MaxConnectionAge, + Time: p.config.PingTime, + Timeout: p.config.Timeout, + }), + grpc.MaxConcurrentStreams(uint32(p.config.MaxConcurrentStreams)), + } + + opts = append(opts, grpc.Creds(tcreds)) + opts = append(opts, serverOptions...) + } + + opts = append(opts, p.opts...) + + // custom codec is required to bypass protobuf, common interceptor used for debug and stats + return append( + opts, + grpc.UnaryInterceptor(p.interceptor), + // TODO(rustatian): check deprecation + // grpc.CustomCodec(&codec{encoding.GetCodec(encCodec)}), + ), nil +} diff --git a/plugins/http/attributes/attributes.go b/plugins/http/attributes/attributes.go index 243b6c78..201c2d5e 100644 --- a/plugins/http/attributes/attributes.go +++ b/plugins/http/attributes/attributes.go @@ -37,7 +37,7 @@ func (v attrs) del(key string) { delete(v, key) } -// Init returns request with new context and attribute bag. +// Init is idempotent returns request with new context and attribute bag. func Init(r *http.Request) *http.Request { // do not overwrite psr attributes if val := r.Context().Value(PsrContextKey); val == nil { diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go index bd8e3b43..4c237ffa 100644 --- a/proto/jobs/v1beta/jobs.pb.go +++ b/proto/jobs/v1beta/jobs.pb.go @@ -487,7 +487,7 @@ func (x *Stats) GetStats() []*Stat { return nil } -// Stats used as a response for the Stats RPC call +// Stat used as a response for the Stats RPC call type Stat struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go index 1e38fe12..19621735 100644 --- a/proto/kv/v1beta/kv.pb.go +++ b/proto/kv/v1beta/kv.pb.go @@ -7,11 +7,10 @@ package kvv1beta import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go index b07c271e..188dcf08 100644 --- a/proto/websockets/v1beta/websockets.pb.go +++ b/proto/websockets/v1beta/websockets.pb.go @@ -7,11 +7,10 @@ package websocketsv1beta import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( diff --git a/tests/env/Dockerfile-elastic-mq.yaml b/tests/env/Dockerfile-elastic-mq.yaml index e1513450..75d8a8ff 100644 --- a/tests/env/Dockerfile-elastic-mq.yaml +++ b/tests/env/Dockerfile-elastic-mq.yaml @@ -1,8 +1,8 @@ FROM openjdk:16 -ADD https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.2.0.jar / +ADD https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.2.1.jar / COPY custom.conf / -ENTRYPOINT ["java", "-Dconfig.file=custom.conf", "-jar", "/elasticmq-server-1.2.0.jar"] +ENTRYPOINT ["java", "-Dconfig.file=custom.conf", "-jar", "/elasticmq-server-1.2.1.jar"] EXPOSE 9324 diff --git a/tests/env/docker-compose.yaml b/tests/env/docker-compose.yaml index d93bc5af..8ef2a99b 100644 --- a/tests/env/docker-compose.yaml +++ b/tests/env/docker-compose.yaml @@ -1,44 +1,44 @@ -version: '3' +version: "3" services: - memcached: - image: memcached:latest - ports: - - "127.0.0.1:11211:11211" - redis: - image: redis:6 - ports: - - "127.0.0.1:6379:6379" - redis2: - image: redis:6 - ports: - - "127.0.0.1:6378:6379" + memcached: + image: memcached:latest + ports: + - "127.0.0.1:11211:11211" + redis: + image: redis:6 + ports: + - "127.0.0.1:6379:6379" + redis2: + image: redis:6 + ports: + - "127.0.0.1:6378:6379" - toxicproxy: - image: shopify/toxiproxy - network_mode: "host" + toxicproxy: + image: ghcr.io/shopify/toxiproxy:latest + network_mode: host - beanstalk: - build: - context: . - dockerfile: Dockerfile-beanstalkd.yaml - ports: - - "127.0.0.1:11300:11300" + beanstalk: + build: + context: . + dockerfile: Dockerfile-beanstalkd.yaml + ports: + - "127.0.0.1:11300:11300" - sqs: - build: - context: . - dockerfile: Dockerfile-elastic-mq.yaml - ports: - - "127.0.0.1:9324:9324" + sqs: + build: + context: . + dockerfile: Dockerfile-elastic-mq.yaml + ports: + - "127.0.0.1:9324:9324" - rabbitmq: - image: rabbitmq:3-management - ports: - - "127.0.0.1:15672:15672" - - "127.0.0.1:5672:5672" + rabbitmq: + image: rabbitmq:3-management + ports: + - "127.0.0.1:15672:15672" + - "127.0.0.1:5672:5672" - prometheus: - image: prom/prometheus - ports: - - "9090:9090" + prometheus: + image: prom/prometheus + ports: + - "9090:9090"
\ No newline at end of file diff --git a/tests/plugins/grpc/configs/.rr-grpc-init.yaml b/tests/plugins/grpc/configs/.rr-grpc-init.yaml new file mode 100644 index 00000000..b743a766 --- /dev/null +++ b/tests/plugins/grpc/configs/.rr-grpc-init.yaml @@ -0,0 +1,58 @@ +rpc: + listen: "tcp://127.0.0.1:6001" + +server: + command: "php ../../psr-worker-bench.php" + relay: "pipes" + relay_timeout: "20s" + +# GRPC service configuration +grpc: + # socket to listen + listen: "tcp://localhost:9001" + + # proto root file + proto: "configs/test.proto" + + # max send limit (MB) + max_send_msg_size: 50 + + # max receive limit (MB) + max_recv_msg_size: 50 + + # MaxConnectionIdle is a duration for the amount of time after which an + # idle connection would be closed by sending a GoAway. Idleness duration is + # defined since the most recent time the number of outstanding RPCs became + # zero or the connection establishment. + max_connection_idle: 0s + + # MaxConnectionAge is a duration for the maximum amount of time a + # connection may exist before it will be closed by sending a GoAway. A + # random jitter of +/-10% will be added to MaxConnectionAge to spread out + # connection storms. + max_connection_age: 0s + + # MaxConnectionAgeGrace is an additive period after MaxConnectionAge after + # which the connection will be forcibly closed. + max_connection_age_grace: 0s + + # MaxConnectionAgeGrace is an additive period after MaxConnectionAge after + # which the connection will be forcibly closed. + max_concurrent_streams: 10 + + # After a duration of this time if the server doesn't see any activity it + # pings the client to see if the transport is still alive. + # If set below 1s, a minimum value of 1s will be used instead. + ping_time: 1s + + # After having pinged for keepalive check, the server waits for a duration + # of Timeout and if no activity is seen even after that the connection is + # closed. + timeout: 200s + + # Usual workers pool configuration + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60 diff --git a/tests/plugins/grpc/configs/external.proto b/tests/plugins/grpc/configs/external.proto new file mode 100644 index 00000000..2bbe806e --- /dev/null +++ b/tests/plugins/grpc/configs/external.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; +package tests; + +service External { + rpc Echo (Ping) returns (Pong) { + } + + rpc Empty (EmptyMessage) returns (EmptyMessage) { + + } +} + +message Ping { + int64 value = 1; +} + +message Pong { + int64 value = 1; +}
\ No newline at end of file diff --git a/tests/plugins/grpc/configs/test.pb.go b/tests/plugins/grpc/configs/test.pb.go new file mode 100644 index 00000000..5f30ceb6 --- /dev/null +++ b/tests/plugins/grpc/configs/test.pb.go @@ -0,0 +1,291 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.17.3 +// source: test.proto + +package __ + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{0} +} + +func (x *Message) GetMsg() string { + if x != nil { + return x.Msg + } + return "" +} + +type EmptyMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *EmptyMessage) Reset() { + *x = EmptyMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EmptyMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EmptyMessage) ProtoMessage() {} + +func (x *EmptyMessage) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EmptyMessage.ProtoReflect.Descriptor instead. +func (*EmptyMessage) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{1} +} + +type DetailsMessageForException struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Code uint64 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *DetailsMessageForException) Reset() { + *x = DetailsMessageForException{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DetailsMessageForException) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DetailsMessageForException) ProtoMessage() {} + +func (x *DetailsMessageForException) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DetailsMessageForException.ProtoReflect.Descriptor instead. +func (*DetailsMessageForException) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{2} +} + +func (x *DetailsMessageForException) GetCode() uint64 { + if x != nil { + return x.Code + } + return 0 +} + +func (x *DetailsMessageForException) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_test_proto protoreflect.FileDescriptor + +var file_test_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0x1b, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, + 0x73, 0x67, 0x22, 0x0e, 0x0a, 0x0c, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x22, 0x4a, 0x0a, 0x1a, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x46, 0x6f, 0x72, 0x45, 0x78, 0x63, 0x65, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, + 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xf6, + 0x01, 0x0a, 0x04, 0x54, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x12, + 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x1a, 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x2d, 0x0a, 0x05, 0x54, 0x68, 0x72, 0x6f, 0x77, 0x12, 0x10, + 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x1a, 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x22, 0x00, 0x12, 0x2b, 0x0a, 0x03, 0x44, 0x69, 0x65, 0x12, 0x10, 0x2e, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x10, 0x2e, + 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, + 0x00, 0x12, 0x2c, 0x0a, 0x04, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x10, 0x2e, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, + 0x36, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x15, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x15, + 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x42, 0x05, 0x5a, 0x03, 0x2e, 0x2f, 0x3b, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_test_proto_rawDescOnce sync.Once + file_test_proto_rawDescData = file_test_proto_rawDesc +) + +func file_test_proto_rawDescGZIP() []byte { + file_test_proto_rawDescOnce.Do(func() { + file_test_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_proto_rawDescData) + }) + return file_test_proto_rawDescData +} + +var file_test_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_test_proto_goTypes = []interface{}{ + (*Message)(nil), // 0: service.Message + (*EmptyMessage)(nil), // 1: service.EmptyMessage + (*DetailsMessageForException)(nil), // 2: service.DetailsMessageForException +} +var file_test_proto_depIdxs = []int32{ + 0, // 0: service.Test.Echo:input_type -> service.Message + 0, // 1: service.Test.Throw:input_type -> service.Message + 0, // 2: service.Test.Die:input_type -> service.Message + 0, // 3: service.Test.Info:input_type -> service.Message + 1, // 4: service.Test.Ping:input_type -> service.EmptyMessage + 0, // 5: service.Test.Echo:output_type -> service.Message + 0, // 6: service.Test.Throw:output_type -> service.Message + 0, // 7: service.Test.Die:output_type -> service.Message + 0, // 8: service.Test.Info:output_type -> service.Message + 1, // 9: service.Test.Ping:output_type -> service.EmptyMessage + 5, // [5:10] is the sub-list for method output_type + 0, // [0:5] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_test_proto_init() } +func file_test_proto_init() { + if File_test_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_test_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_test_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EmptyMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_test_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DetailsMessageForException); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_test_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_test_proto_goTypes, + DependencyIndexes: file_test_proto_depIdxs, + MessageInfos: file_test_proto_msgTypes, + }.Build() + File_test_proto = out.File + file_test_proto_rawDesc = nil + file_test_proto_goTypes = nil + file_test_proto_depIdxs = nil +} diff --git a/tests/plugins/grpc/configs/test.proto b/tests/plugins/grpc/configs/test.proto new file mode 100644 index 00000000..2e1c90a9 --- /dev/null +++ b/tests/plugins/grpc/configs/test.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; + +package service; +option go_package = "./;"; + +service Test { + rpc Echo (Message) returns (Message) { + } + + rpc Throw (Message) returns (Message) { + } + + rpc Die (Message) returns (Message) { + } + + rpc Info (Message) returns (Message) { + } + + rpc Ping (EmptyMessage) returns (EmptyMessage) { + } +} + +message Message { + string msg = 1; +} + +message EmptyMessage { +} + +message DetailsMessageForException { + uint64 code = 1; + string message = 2; +}
\ No newline at end of file diff --git a/tests/plugins/grpc/grpc_plugin_test.go b/tests/plugins/grpc/grpc_plugin_test.go new file mode 100644 index 00000000..b92282f7 --- /dev/null +++ b/tests/plugins/grpc/grpc_plugin_test.go @@ -0,0 +1,89 @@ +package grpc_test + +import ( + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/grpc" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/stretchr/testify/assert" +) + +func TestGrpcInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-grpc-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &grpc.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &informer.Plugin{}, + &resetter.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + + wg.Wait() +} diff --git a/tests/plugins/grpc/php_server/.rr.yaml b/tests/plugins/grpc/php_server/.rr.yaml new file mode 100644 index 00000000..cc4a9300 --- /dev/null +++ b/tests/plugins/grpc/php_server/.rr.yaml @@ -0,0 +1,22 @@ +grpc: + listen: "tcp://:9001" + proto: "service.proto" + tls: + key: "server.key" + cert: "server.crt" + workers: + command: "php worker.php" + pool: + numWorkers: 4 + +metrics: + address: localhost:2112 + +limit: + interval: 1 + services: + grpc: + maxMemory: 100 + TTL: 0 + idleTTL: 0 + execTTL: 60
\ No newline at end of file diff --git a/tests/plugins/grpc/php_server/composer.json b/tests/plugins/grpc/php_server/composer.json new file mode 100644 index 00000000..b6303291 --- /dev/null +++ b/tests/plugins/grpc/php_server/composer.json @@ -0,0 +1,23 @@ +{ + "name": "app/example-grpc-server", + "description": "Example GRPC Server", + "repositories": [ + { + "type": "path", + "url": "../.." + } + ], + "require": { + "spiral/php-grpc": "*" + }, + "require-dev": { + "grpc/grpc": "^1.36" + }, + "autoload": { + "psr-4": { + "": "src" + } + }, + "minimum-stability": "dev", + "prefer-stable": true +} diff --git a/tests/plugins/grpc/php_server/server.crt b/tests/plugins/grpc/php_server/server.crt new file mode 100644 index 00000000..24d67fd7 --- /dev/null +++ b/tests/plugins/grpc/php_server/server.crt @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICTTCCAdOgAwIBAgIJAOKyUd+llTRKMAoGCCqGSM49BAMCMGMxCzAJBgNVBAYT +AlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2Nv +MRMwEQYDVQQKDApSb2FkUnVubmVyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMTgw +OTMwMTMzNDUzWhcNMjgwOTI3MTMzNDUzWjBjMQswCQYDVQQGEwJVUzETMBEGA1UE +CAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzETMBEGA1UECgwK +Um9hZFJ1bm5lcjESMBAGA1UEAwwJbG9jYWxob3N0MHYwEAYHKoZIzj0CAQYFK4EE +ACIDYgAEVnbShsM+l5RR3wfWWmGhzuFGwNzKCk7i9xyobDIyBUxG/UUSfj7KKlUX +puDnDEtF5xXcepl744CyIAYFLOXHb5WqI4jCOzG0o9f/00QQ4bQudJOdbqV910QF +C2vb7Fxro1MwUTAdBgNVHQ4EFgQU9xUexnbB6ORKayA7Pfjzs33otsAwHwYDVR0j +BBgwFoAU9xUexnbB6ORKayA7Pfjzs33otsAwDwYDVR0TAQH/BAUwAwEB/zAKBggq +hkjOPQQDAgNoADBlAjEAue3HhR/MUhxoa9tSDBtOJT3FYbDQswrsdqBTz97CGKst +e7XeZ3HMEvEXy0hGGEMhAjAqcD/4k9vViVppgWFtkk6+NFbm+Kw/QeeAiH5FgFSj +8xQcb+b7nPwNLp3JOkXkVd4= +-----END CERTIFICATE----- diff --git a/tests/plugins/grpc/php_server/server.key b/tests/plugins/grpc/php_server/server.key new file mode 100644 index 00000000..7501dd46 --- /dev/null +++ b/tests/plugins/grpc/php_server/server.key @@ -0,0 +1,9 @@ +-----BEGIN EC PARAMETERS----- +BgUrgQQAIg== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MIGkAgEBBDCQP8utxNbHR6xZOLAJgUhn88r6IrPqmN0MsgGJM/jePB+T9UhkmIU8 +PMm2HeScbcugBwYFK4EEACKhZANiAARWdtKGwz6XlFHfB9ZaYaHO4UbA3MoKTuL3 +HKhsMjIFTEb9RRJ+PsoqVRem4OcMS0XnFdx6mXvjgLIgBgUs5cdvlaojiMI7MbSj +1//TRBDhtC50k51upX3XRAULa9vsXGs= +-----END EC PRIVATE KEY----- diff --git a/tests/plugins/grpc/php_server/service.proto b/tests/plugins/grpc/php_server/service.proto new file mode 100644 index 00000000..60ff84a9 --- /dev/null +++ b/tests/plugins/grpc/php_server/service.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; +package service; + +service Echo { + rpc Ping (Message) returns (Message) { + } +} + +message Message { + string msg = 1; +}
\ No newline at end of file diff --git a/tests/plugins/grpc/php_server/src/EchoService.php b/tests/plugins/grpc/php_server/src/EchoService.php new file mode 100644 index 00000000..c2707811 --- /dev/null +++ b/tests/plugins/grpc/php_server/src/EchoService.php @@ -0,0 +1,17 @@ +<?php +/** + * Sample GRPC PHP server. + */ + +use Spiral\GRPC\ContextInterface; +use Service\EchoInterface; +use Service\Message; + +class EchoService implements EchoInterface +{ + public function Ping(ContextInterface $ctx, Message $in): Message + { + $out = new Message(); + return $out->setMsg(strtoupper($in->getMsg())); + } +}
\ No newline at end of file diff --git a/tests/plugins/grpc/php_server/src/GPBMetadata/Service.php b/tests/plugins/grpc/php_server/src/GPBMetadata/Service.php new file mode 100644 index 00000000..c1b65b21 --- /dev/null +++ b/tests/plugins/grpc/php_server/src/GPBMetadata/Service.php @@ -0,0 +1,27 @@ +<?php +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: service.proto + +namespace GPBMetadata; + +class Service +{ + public static $is_initialized = false; + + public static function initOnce() { + $pool = \Google\Protobuf\Internal\DescriptorPool::getGeneratedPool(); + + if (static::$is_initialized == true) { + return; + } + $pool->internalAddGeneratedFile(hex2bin( + "0a6e0a0d736572766963652e70726f746f12077365727669636522160a07" . + "4d657373616765120b0a036d736718012001280932340a044563686f122c" . + "0a0450696e6712102e736572766963652e4d6573736167651a102e736572" . + "766963652e4d6573736167652200620670726f746f33" + )); + + static::$is_initialized = true; + } +} + diff --git a/tests/plugins/grpc/php_server/src/Service/EchoInterface.php b/tests/plugins/grpc/php_server/src/Service/EchoInterface.php new file mode 100644 index 00000000..5f336ace --- /dev/null +++ b/tests/plugins/grpc/php_server/src/Service/EchoInterface.php @@ -0,0 +1,22 @@ +<?php +# Generated by the protocol buffer compiler (spiral/grpc). DO NOT EDIT! +# source: service.proto + +namespace Service; + +use Spiral\GRPC; + +interface EchoInterface extends GRPC\ServiceInterface +{ + // GRPC specific service name. + public const NAME = "service.Echo"; + + /** + * @param GRPC\ContextInterface $ctx + * @param Message $in + * @return Message + * + * @throws GRPC\Exception\InvokeException + */ + public function Ping(GRPC\ContextInterface $ctx, Message $in): Message; +} diff --git a/tests/plugins/grpc/php_server/src/Service/Message.php b/tests/plugins/grpc/php_server/src/Service/Message.php new file mode 100644 index 00000000..6c40c879 --- /dev/null +++ b/tests/plugins/grpc/php_server/src/Service/Message.php @@ -0,0 +1,58 @@ +<?php +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: service.proto + +namespace Service; + +use Google\Protobuf\Internal\GPBType; +use Google\Protobuf\Internal\RepeatedField; +use Google\Protobuf\Internal\GPBUtil; + +/** + * Generated from protobuf message <code>service.Message</code> + */ +class Message extends \Google\Protobuf\Internal\Message +{ + /** + * Generated from protobuf field <code>string msg = 1;</code> + */ + private $msg = ''; + + /** + * Constructor. + * + * @param array $data { + * Optional. Data for populating the Message object. + * + * @type string $msg + * } + */ + public function __construct($data = NULL) { + \GPBMetadata\Service::initOnce(); + parent::__construct($data); + } + + /** + * Generated from protobuf field <code>string msg = 1;</code> + * @return string + */ + public function getMsg() + { + return $this->msg; + } + + /** + * Generated from protobuf field <code>string msg = 1;</code> + * @param string $var + * @return $this + */ + public function setMsg($var) + { + GPBUtil::checkString($var, True); + $this->msg = $var; + + return $this; + } + +} + diff --git a/tests/plugins/grpc/php_server/worker-grpc.php b/tests/plugins/grpc/php_server/worker-grpc.php new file mode 100644 index 00000000..683a2341 --- /dev/null +++ b/tests/plugins/grpc/php_server/worker-grpc.php @@ -0,0 +1,26 @@ +<?php +/** + * Sample GRPC PHP server. + */ + +use Service\EchoInterface; +use Spiral\Goridge\StreamRelay; +use Spiral\GRPC\Server; +use Spiral\RoadRunner\Worker; + +require __DIR__ . '/vendor/autoload.php'; + +$server = new Server(null, [ + 'debug' => false, // optional (default: false) +]); + +$server->registerService(EchoInterface::class, new EchoService()); + +$worker = \method_exists(Worker::class, 'create') + // RoadRunner >= 2.x + ? Worker::create() + // RoadRunner 1.x + : new Worker(new StreamRelay(STDIN, STDOUT)) +; + +$server->serve($worker); diff --git a/tests/plugins/grpc/plugin_test.go b/tests/plugins/grpc/plugin_test.go new file mode 100644 index 00000000..cfbe0121 --- /dev/null +++ b/tests/plugins/grpc/plugin_test.go @@ -0,0 +1,178 @@ +package grpc + +import ( + "io/ioutil" + "os" + "os/exec" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func init() { + err := build() + if err != nil { + panic(err) + } +} + +func build() error { + cmd := exec.Command("go", "build", "-o", "plugin", "../../../plugins/grpc/protoc_plugins/protoc-gen-php-grpc") + return cmd.Run() +} + +func protoc(t *testing.T, args []string) { + cmd := exec.Command("protoc", "--plugin=protoc-gen-php-grpc=./plugin") + cmd.Args = append(cmd.Args, args...) + out, err := cmd.CombinedOutput() + + if len(out) > 0 || err != nil { + t.Log("RUNNING: ", strings.Join(cmd.Args, " ")) + } + + if len(out) > 0 { + t.Log(string(out)) + } + + if err != nil { + t.Fatalf("protoc: %v", err) + } +} + +func Test_Simple(t *testing.T) { + workdir, _ := os.Getwd() + tmpdir, err := ioutil.TempDir("", "proto-test") + require.NoError(t, err) + + defer func() { + assert.NoError(t, os.RemoveAll(tmpdir)) + }() + + args := []string{ + "-Itestdata", + "--php-grpc_out=" + tmpdir, + "simple/simple.proto", + } + + protoc(t, args) + + assertEqualFiles( + t, + workdir+"/testdata/simple/TestSimple/SimpleServiceInterface.php", + tmpdir+"/TestSimple/SimpleServiceInterface.php", + ) +} + +func Test_PhpNamespaceOption(t *testing.T) { + workdir, _ := os.Getwd() + tmpdir, err := ioutil.TempDir("", "proto-test") + require.NoError(t, err) + + defer func() { + assert.NoError(t, os.RemoveAll(tmpdir)) + }() + + args := []string{ + "-Itestdata", + "--php-grpc_out=" + tmpdir, + "php_namespace/service.proto", + } + protoc(t, args) + + assertEqualFiles( + t, + workdir+"/testdata/php_namespace/Test/CustomNamespace/ServiceInterface.php", + tmpdir+"/Test/CustomNamespace/ServiceInterface.php", + ) +} + +func Test_UseImportedMessage(t *testing.T) { + workdir, _ := os.Getwd() + tmpdir, err := ioutil.TempDir("", "proto-test") + require.NoError(t, err) + + defer func() { + assert.NoError(t, os.RemoveAll(tmpdir)) + }() + + args := []string{ + "-Itestdata", + "--php-grpc_out=" + tmpdir, + "import/service.proto", + } + protoc(t, args) + + assertEqualFiles( + t, + workdir+"/testdata/import/Import/ServiceInterface.php", + tmpdir+"/Import/ServiceInterface.php", + ) +} + +func Test_PhpNamespaceOptionInUse(t *testing.T) { + workdir, _ := os.Getwd() + tmpdir, err := ioutil.TempDir("", "proto-test") + require.NoError(t, err) + + defer func() { + assert.NoError(t, os.RemoveAll(tmpdir)) + }() + + args := []string{ + "-Itestdata", + "--php-grpc_out=" + tmpdir, + "import_custom/service.proto", + } + protoc(t, args) + + assertEqualFiles( + t, + workdir+"/testdata/import_custom/Test/CustomImport/ServiceInterface.php", + tmpdir+"/Test/CustomImport/ServiceInterface.php", + ) +} + +func Test_UseOfGoogleEmptyMessage(t *testing.T) { + workdir, _ := os.Getwd() + tmpdir, err := ioutil.TempDir("", "proto-test") + require.NoError(t, err) + + defer func() { + assert.NoError(t, os.RemoveAll(tmpdir)) + }() + + args := []string{ + "-Itestdata", + "--php-grpc_out=" + tmpdir, + "use_empty/service.proto", + } + protoc(t, args) + + assertEqualFiles( + t, + workdir+"/testdata/use_empty/Test/ServiceInterface.php", + tmpdir+"/Test/ServiceInterface.php", + ) + + assert.NoError(t, os.RemoveAll("plugin")) +} + +func assertEqualFiles(t *testing.T, original, generated string) { + assert.FileExists(t, generated) + + originalData, err := ioutil.ReadFile(original) + if err != nil { + t.Fatal("Can't find original file for comparison") + } + + generatedData, err := ioutil.ReadFile(generated) + if err != nil { + t.Fatal("Can't find generated file for comparison") + } + + // every OS has a special boy + r := strings.NewReplacer("\r\n", "", "\n", "") + assert.Equal(t, r.Replace(string(originalData)), r.Replace(string(generatedData))) +} diff --git a/tests/plugins/grpc/testdata/import/Import/ServiceInterface.php b/tests/plugins/grpc/testdata/import/Import/ServiceInterface.php new file mode 100644 index 00000000..13e58daf --- /dev/null +++ b/tests/plugins/grpc/testdata/import/Import/ServiceInterface.php @@ -0,0 +1,32 @@ +<?php +# Generated by the protocol buffer compiler (spiral/php-grpc). DO NOT EDIT! +# source: import/service.proto + +namespace Import; + +use Spiral\GRPC; +use Import\Sub; + +interface ServiceInterface extends GRPC\ServiceInterface +{ + // GRPC specific service name. + public const NAME = "import.Service"; + + /** + * @param GRPC\ContextInterface $ctx + * @param Message $in + * @return Message + * + * @throws GRPC\Exception\InvokeException + */ + public function SimpleMethod(GRPC\ContextInterface $ctx, Message $in): Message; + + /** + * @param GRPC\ContextInterface $ctx + * @param Sub\Message $in + * @return Sub\Message + * + * @throws GRPC\Exception\InvokeException + */ + public function ImportMethod(GRPC\ContextInterface $ctx, Sub\Message $in): Sub\Message; +} diff --git a/tests/plugins/grpc/testdata/import/service.proto b/tests/plugins/grpc/testdata/import/service.proto new file mode 100644 index 00000000..5d888f09 --- /dev/null +++ b/tests/plugins/grpc/testdata/import/service.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package import; + +import "import/sub/message.proto"; + +service Service { + rpc SimpleMethod (Message) returns (Message) { + } + + rpc ImportMethod (import.sub.Message) returns (import.sub.Message) { + } +} + +message Message { + int64 id = 1; +}
\ No newline at end of file diff --git a/tests/plugins/grpc/testdata/import/sub/message.proto b/tests/plugins/grpc/testdata/import/sub/message.proto new file mode 100644 index 00000000..1db0313b --- /dev/null +++ b/tests/plugins/grpc/testdata/import/sub/message.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package import.sub; + +message Message { + int64 id = 1; +}
\ No newline at end of file diff --git a/tests/plugins/grpc/testdata/import_custom/Test/CustomImport/ServiceInterface.php b/tests/plugins/grpc/testdata/import_custom/Test/CustomImport/ServiceInterface.php new file mode 100644 index 00000000..b010ce4f --- /dev/null +++ b/tests/plugins/grpc/testdata/import_custom/Test/CustomImport/ServiceInterface.php @@ -0,0 +1,32 @@ +<?php +# Generated by the protocol buffer compiler (spiral/php-grpc). DO NOT EDIT! +# source: import_custom/service.proto + +namespace Test\CustomImport; + +use Spiral\GRPC; +use Test\CustomImport\Message; + +interface ServiceInterface extends GRPC\ServiceInterface +{ + // GRPC specific service name. + public const NAME = "import.Service"; + + /** + * @param GRPC\ContextInterface $ctx + * @param Message $in + * @return Message + * + * @throws GRPC\Exception\InvokeException + */ + public function SimpleMethod(GRPC\ContextInterface $ctx, Message $in): Message; + + /** + * @param GRPC\ContextInterface $ctx + * @param Message\Message $in + * @return Message\Message + * + * @throws GRPC\Exception\InvokeException + */ + public function ImportMethod(GRPC\ContextInterface $ctx, Message\Message $in): Message\Message; +} diff --git a/tests/plugins/grpc/testdata/import_custom/service.proto b/tests/plugins/grpc/testdata/import_custom/service.proto new file mode 100644 index 00000000..872aaae3 --- /dev/null +++ b/tests/plugins/grpc/testdata/import_custom/service.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package import; + +option php_namespace = "Test\\CustomImport"; + +import "import_custom/sub/message.proto"; + +service Service { + rpc SimpleMethod (Message) returns (Message) { + } + + rpc ImportMethod (import.sub.Message) returns (import.sub.Message) { + } +} + +message Message { + int64 id = 1; +}
\ No newline at end of file diff --git a/tests/plugins/grpc/testdata/import_custom/sub/message.proto b/tests/plugins/grpc/testdata/import_custom/sub/message.proto new file mode 100644 index 00000000..5d722ca3 --- /dev/null +++ b/tests/plugins/grpc/testdata/import_custom/sub/message.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package import.sub; +option php_namespace = "Test\\CustomImport\\Message"; + + +service Service { + rpc AnotherMethod (Message) returns (Message) { + } +} + +message Message { + int64 id = 1; +}
\ No newline at end of file diff --git a/tests/plugins/grpc/testdata/php_namespace/Test/CustomNamespace/ServiceInterface.php b/tests/plugins/grpc/testdata/php_namespace/Test/CustomNamespace/ServiceInterface.php new file mode 100644 index 00000000..2090ba97 --- /dev/null +++ b/tests/plugins/grpc/testdata/php_namespace/Test/CustomNamespace/ServiceInterface.php @@ -0,0 +1,22 @@ +<?php +# Generated by the protocol buffer compiler (spiral/php-grpc). DO NOT EDIT! +# source: php_namespace/service.proto + +namespace Test\CustomNamespace; + +use Spiral\GRPC; + +interface ServiceInterface extends GRPC\ServiceInterface +{ + // GRPC specific service name. + public const NAME = "testPhpNamespace.Service"; + + /** + * @param GRPC\ContextInterface $ctx + * @param SimpleMessage $in + * @return SimpleMessage + * + * @throws GRPC\Exception\InvokeException + */ + public function SimpleMethod(GRPC\ContextInterface $ctx, SimpleMessage $in): SimpleMessage; +} diff --git a/tests/plugins/grpc/testdata/php_namespace/service.proto b/tests/plugins/grpc/testdata/php_namespace/service.proto new file mode 100644 index 00000000..a3bfa3c0 --- /dev/null +++ b/tests/plugins/grpc/testdata/php_namespace/service.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package testPhpNamespace; + +option php_namespace = "Test\\CustomNamespace"; + +service Service { + rpc SimpleMethod (SimpleMessage) returns (SimpleMessage) { + } +} + +message SimpleMessage { + int32 id = 1; + string name = 2; +}
\ No newline at end of file diff --git a/tests/plugins/grpc/testdata/simple/TestSimple/SimpleServiceInterface.php b/tests/plugins/grpc/testdata/simple/TestSimple/SimpleServiceInterface.php new file mode 100644 index 00000000..f9e84bf7 --- /dev/null +++ b/tests/plugins/grpc/testdata/simple/TestSimple/SimpleServiceInterface.php @@ -0,0 +1,22 @@ +<?php +# Generated by the protocol buffer compiler (spiral/php-grpc). DO NOT EDIT! +# source: simple/simple.proto + +namespace TestSimple; + +use Spiral\GRPC; + +interface SimpleServiceInterface extends GRPC\ServiceInterface +{ + // GRPC specific service name. + public const NAME = "testSimple.SimpleService"; + + /** + * @param GRPC\ContextInterface $ctx + * @param SimpleMessage $in + * @return SimpleMessage + * + * @throws GRPC\Exception\InvokeException + */ + public function SimpleMethod(GRPC\ContextInterface $ctx, SimpleMessage $in): SimpleMessage; +} diff --git a/tests/plugins/grpc/testdata/simple/simple.proto b/tests/plugins/grpc/testdata/simple/simple.proto new file mode 100644 index 00000000..aca3c1d9 --- /dev/null +++ b/tests/plugins/grpc/testdata/simple/simple.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +package testSimple; + +service SimpleService { + rpc SimpleMethod (SimpleMessage) returns (SimpleMessage) { + } +} + +message SimpleMessage { + int32 id = 1; + string name = 2; +}
\ No newline at end of file diff --git a/tests/plugins/grpc/testdata/use_empty/Test/ServiceInterface.php b/tests/plugins/grpc/testdata/use_empty/Test/ServiceInterface.php new file mode 100644 index 00000000..fe6d345a --- /dev/null +++ b/tests/plugins/grpc/testdata/use_empty/Test/ServiceInterface.php @@ -0,0 +1,23 @@ +<?php +# Generated by the protocol buffer compiler (spiral/php-grpc). DO NOT EDIT! +# source: use_empty/service.proto + +namespace Test; + +use Spiral\GRPC; +use Google\Protobuf; + +interface ServiceInterface extends GRPC\ServiceInterface +{ + // GRPC specific service name. + public const NAME = "test.Service"; + + /** + * @param GRPC\ContextInterface $ctx + * @param Protobuf\GPBEmpty $in + * @return Protobuf\GPBEmpty + * + * @throws GRPC\Exception\InvokeException + */ + public function Test(GRPC\ContextInterface $ctx, Protobuf\GPBEmpty $in): Protobuf\GPBEmpty; +} diff --git a/tests/plugins/grpc/testdata/use_empty/service.proto b/tests/plugins/grpc/testdata/use_empty/service.proto new file mode 100644 index 00000000..8c68d8d3 --- /dev/null +++ b/tests/plugins/grpc/testdata/use_empty/service.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package test; + +import "google/protobuf/empty.proto"; + +service Service { + rpc Test (google.protobuf.Empty) returns (google.protobuf.Empty) { + } +}
\ No newline at end of file diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go index 80fed8eb..84fbec48 100644 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -27,10 +27,18 @@ import ( func TestDurabilityAMQP(t *testing.T) { client := toxiproxy.NewClient("127.0.0.1:8474") + proxies, err := client.Proxies() + require.NoError(t, err) + + for p := range proxies { + _ = proxies[p].Delete() + } - _, err := client.CreateProxy("redial", "127.0.0.1:23679", "127.0.0.1:5672") + proxy, err := client.CreateProxy("redial", "127.0.0.1:23679", "127.0.0.1:5672") require.NoError(t, err) - defer deleteProxy("redial", t) + defer func() { + _ = proxy.Delete() + }() cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) require.NoError(t, err) diff --git a/tests/plugins/logger/logger_test.go b/tests/plugins/logger/logger_test.go index 05ca2d53..e077f0bc 100644 --- a/tests/plugins/logger/logger_test.go +++ b/tests/plugins/logger/logger_test.go @@ -346,18 +346,6 @@ func TestFileLogger(t *testing.T) { wg.Wait() } -func httpEcho(t *testing.T) { - req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:54224?hello=world", nil) - assert.NoError(t, err) - - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - assert.Equal(t, http.StatusCreated, r.StatusCode) - - err = r.Body.Close() - assert.NoError(t, err) -} - func TestMarshalObjectLogging(t *testing.T) { container, err := endure.NewContainer(nil, endure.RetryOnFail(true), endure.SetLogLevel(endure.ErrorLevel)) if err != nil { @@ -428,3 +416,15 @@ func TestMarshalObjectLogging(t *testing.T) { stopCh <- struct{}{} wg.Wait() } + +func httpEcho(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:54224?hello=world", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusCreated, r.StatusCode) + + err = r.Body.Close() + assert.NoError(t, err) +} diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index bfdc980b..3e74ca59 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -660,7 +660,7 @@ func RPCWsPubAsync(port string) func(t *testing.T) { }() go func() { - messagesToVerify := make([]string, 0, 10) + messagesToVerify := make([]string, 0, 4) messagesToVerify = append(messagesToVerify, `{"topic":"@join","payload":["foo","foo2"]}`) messagesToVerify = append(messagesToVerify, `{"topic":"foo","payload":"hello, PHP"}`) messagesToVerify = append(messagesToVerify, `{"topic":"@leave","payload":["foo"]}`) diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php index 80fc435c..e809f380 100644 --- a/tests/psr-worker-bench.php +++ b/tests/psr-worker-bench.php @@ -1,7 +1,9 @@ <?php + /** * @var Goridge\RelayInterface $relay */ + use Spiral\Goridge; use Spiral\RoadRunner; diff --git a/tests/script.sh b/tests/script.sh new file mode 100755 index 00000000..746fb768 --- /dev/null +++ b/tests/script.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +php ../../../tests/client.php echo pipes
\ No newline at end of file |