summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/build.yml76
-rwxr-xr-x.gitignore3
-rwxr-xr-xMakefile29
-rwxr-xr-xbors.toml6
-rw-r--r--codecov.yml12
-rwxr-xr-xcomposer.json44
-rwxr-xr-xgo.mod7
-rwxr-xr-xgo.sum36
-rw-r--r--phpstan.neon.dist4
-rwxr-xr-xpipe_factory.go2
-rwxr-xr-xpipe_factory_test.go2
-rw-r--r--plugins/checker/tests/plugin_test.go2
-rw-r--r--plugins/gzip/tests/configs/.rr-http-middlewareNotExist.yaml2
-rw-r--r--plugins/gzip/tests/configs/.rr-http-withGzip.yaml2
-rw-r--r--plugins/http/plugin.go9
-rw-r--r--plugins/http/tests/configs/.rr-h2c.yaml9
-rw-r--r--plugins/http/tests/configs/.rr-ssl-push.yaml8
-rw-r--r--plugins/http/tests/configs/.rr-ssl-redirect.yaml8
-rw-r--r--plugins/http/tests/configs/.rr-ssl.yaml2
-rw-r--r--plugins/http/tests/http_test.go4
-rw-r--r--plugins/http/tests/uploads_test.go4
-rw-r--r--plugins/http/uploads.go6
-rw-r--r--plugins/informer/tests/informer_test.go2
-rw-r--r--plugins/metrics/config.go11
-rw-r--r--plugins/metrics/tests/metrics_test.go2
-rw-r--r--plugins/resetter/tests/resetter_test.go7
-rwxr-xr-xplugins/rpc/config_test.go7
-rwxr-xr-xplugins/rpc/plugin.go2
-rw-r--r--plugins/rpc/tests/plugin2.go2
-rw-r--r--plugins/server/tests/socket.php6
-rw-r--r--plugins/server/tests/tcp.php8
-rwxr-xr-xprotocol.go48
-rwxr-xr-xprotocol_test.go47
-rwxr-xr-xsocket_factory.go4
-rwxr-xr-xsocket_factory_test.go4
-rw-r--r--src/Diactoros/ServerRequestFactory.php28
-rw-r--r--src/Diactoros/StreamFactory.php57
-rw-r--r--src/Diactoros/UploadedFileFactory.php36
-rw-r--r--src/Exception/MetricException.php17
-rw-r--r--src/Exceptions/RoadRunnerException.php18
-rw-r--r--src/HttpClient.php74
-rw-r--r--src/Metrics.php80
-rw-r--r--src/MetricsInterface.php64
-rw-r--r--src/Worker.php178
-rwxr-xr-xstatic_pool.go3
-rwxr-xr-xstatic_pool_test.go14
-rw-r--r--supervisor_test.go3
-rwxr-xr-xsync_worker.go65
-rwxr-xr-xsync_worker_test.go4
-rw-r--r--tests/broken.php4
-rw-r--r--tests/client.php2
-rw-r--r--tests/composer.json12
-rw-r--r--tests/delay.php4
-rw-r--r--tests/echo.php4
-rw-r--r--tests/error.php4
-rw-r--r--tests/head.php4
-rw-r--r--tests/http/client.php14
-rw-r--r--tests/http/slow-client.php18
-rw-r--r--tests/memleak.php6
-rw-r--r--tests/pid.php2
-rw-r--r--tests/psr-worker-bench.php28
-rw-r--r--tests/psr-worker.php (renamed from plugins/gzip/tests/psr-worker.php)15
-rw-r--r--tests/sleep.php6
-rw-r--r--tests/slow-client.php2
-rw-r--r--tests/slow-destroy.php2
-rw-r--r--tests/slow-pid.php2
-rw-r--r--tests/src/Environment.php82
-rw-r--r--tests/src/EnvironmentInterface.php43
-rw-r--r--tests/src/Exception/EnvironmentException.php16
-rw-r--r--tests/src/Exception/RoadRunnerException.php (renamed from src/Exception/RoadRunnerException.php)7
-rw-r--r--tests/src/Http/HttpWorker.php105
-rw-r--r--tests/src/Http/PSR7Worker.php (renamed from src/PSR7Client.php)154
-rw-r--r--tests/src/Http/Request.php48
-rw-r--r--tests/src/Payload.php43
-rw-r--r--tests/src/Worker.php162
-rw-r--r--tests/src/WorkerInterface.php55
-rw-r--r--tests/stop.php2
-rwxr-xr-xutil/isolate.go4
-rwxr-xr-xworker.go81
-rwxr-xr-xworker_watcher.go201
80 files changed, 1114 insertions, 1086 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 8cc0291c..070350a0 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -5,55 +5,16 @@ on:
pull_request:
jobs:
- php:
- name: Build (PHP ${{ matrix.php }}, ${{ matrix.setup }} setup)
- runs-on: ubuntu-20.04
- timeout-minutes: 6
- strategy:
- fail-fast: false
- matrix:
- php: [ '7.4', '8.0' ]
- setup: [ basic, lowest ]
- steps:
- - name: Set up PHP ${{ matrix.php }}
- uses: shivammathur/setup-php@v2 # action page: <https://github.com/shivammathur/setup-php>
- with:
- php-version: ${{ matrix.php }}
-
- - name: Check out code
- uses: actions/checkout@v2
-
- - name: Syntax check only (lint)
- run: find ./src/ ./tests/ -name "*.php" -print0 | xargs -0 -n1 -P8 php -l
-
- - name: Get Composer Cache Directory
- id: composer-cache
- run: echo "::set-output name=dir::$(composer config cache-files-dir)"
-
- - name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer>
- uses: actions/cache@v2
- with:
- path: ${{ steps.composer-cache.outputs.dir }}
- key: ${{ runner.os }}-composer-${{ matrix.setup }}-${{ hashFiles('**/composer.json') }}
- restore-keys: ${{ runner.os }}-composer-
-
- - name: Install lowest Composer dependencies
- if: matrix.setup == 'lowest'
- run: composer update --prefer-dist --no-progress --prefer-lowest --ansi
-
- - name: Install basic Composer dependencies
- if: matrix.setup == 'basic'
- run: composer update --prefer-dist --no-progress --ansi
-
golang:
- name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }})
- runs-on: ubuntu-20.04
- timeout-minutes: 10
+ name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
+ runs-on: ${{ matrix.os }}
+ timeout-minutes: 15
strategy:
fail-fast: false
matrix:
php: [ '7.4', '8.0' ]
go: [ '1.14', '1.15' ]
+ os: [ ubuntu-latest, windows-latest, macos-latest ]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v2 # action page: <https://github.com/actions/setup-go>
@@ -64,15 +25,18 @@ jobs:
uses: shivammathur/setup-php@v2 # action page: <https://github.com/shivammathur/setup-php>
with:
php-version: ${{ matrix.php }}
+ extensions: sockets
- name: Check out code
uses: actions/checkout@v2
- name: Get Composer Cache Directory
+ if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }}
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
- name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer>
+ if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }}
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
@@ -80,7 +44,7 @@ jobs:
restore-keys: ${{ runner.os }}-composer-
- name: Install Composer dependencies
- run: composer update --prefer-dist --no-progress --ansi
+ run: cd tests && composer update --prefer-dist --no-progress --ansi
- name: Init Go modules Cache # Docs: <https://git.io/JfAKn#go---modules>
uses: actions/cache@v2
@@ -92,7 +56,28 @@ jobs:
- name: Install Go dependencies
run: go mod download
- - name: Run golang tests
+ - name: Run golang tests on Windows without codecov
+ if: ${{ matrix.os == 'windows-latest' }}
+ run: |
+ go test -v -race -cover -tags=debug .
+ go test -v -race -cover -tags=debug ./plugins/rpc
+ go test -v -race -cover -tags=debug ./plugins/rpc/tests
+ go test -v -race -cover -tags=debug ./plugins/config/tests
+ go test -v -race -cover -tags=debug ./plugins/logger/tests
+ go test -v -race -cover -tags=debug ./plugins/server/tests
+ go test -v -race -cover -tags=debug ./plugins/metrics/tests
+ go test -v -race -cover -tags=debug ./plugins/informer/tests
+ go test -v -race -cover -tags=debug ./plugins/resetter/tests
+ go test -v -race -cover -tags=debug ./plugins/http/attributes
+ go test -v -race -cover -tags=debug ./plugins/http/tests
+ go test -v -race -cover -tags=debug ./plugins/gzip/tests
+ go test -v -race -cover -tags=debug ./plugins/static/tests
+ go test -v -race -cover -tags=debug ./plugins/static
+ go test -v -race -cover -tags=debug ./plugins/headers/tests
+ go test -v -race -cover -tags=debug ./plugins/checker/tests
+
+ - name: Run golang tests on Linux and MacOS
+ if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }}
run: |
mkdir ./coverage-ci
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/lib.txt -covermode=atomic .
@@ -114,6 +99,7 @@ jobs:
cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt
- uses: codecov/codecov-action@v1 # Docs: <https://github.com/codecov/codecov-action>
+ if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }}
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage-ci/summary.txt
diff --git a/.gitignore b/.gitignore
index fb4afc29..78e96559 100755
--- a/.gitignore
+++ b/.gitignore
@@ -21,5 +21,4 @@ vendor
vendor_php
builds/
tests/vendor/
-.rr-sample.yaml
-psr-worker.php
+.rr-sample.yaml \ No newline at end of file
diff --git a/Makefile b/Makefile
index 9ad158ba..e3f00c63 100755
--- a/Makefile
+++ b/Makefile
@@ -24,20 +24,21 @@ uninstall: ## Uninstall locally installed RR
rm -f /usr/local/bin/rr
test: ## Run application tests
- test -d ./vendor_php || composer update --prefer-dist --ansi
- go test -v -race -cover
- go test -v -race -cover ./util
- go test -v -race -cover ./service
- go test -v -race -cover ./service/env
- go test -v -race -cover ./service/rpc
- go test -v -race -cover ./service/http
- go test -v -race -cover ./service/static
- go test -v -race -cover ./service/limit
- go test -v -race -cover ./service/headers
- go test -v -race -cover ./service/metrics
- go test -v -race -cover ./service/health
- go test -v -race -cover ./service/gzip
- go test -v -race -cover ./service/reload
+ go test -v -race -cover -tags=debug -covermode=atomic .
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/config/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/logger/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/server/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/metrics/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/informer/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/resetter/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/http/attributes
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/http/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/gzip/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/static/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/headers/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/checker/tests
lint: ## Run application linters
go fmt ./...
diff --git a/bors.toml b/bors.toml
index e56a268a..73a37428 100755
--- a/bors.toml
+++ b/bors.toml
@@ -1,10 +1,4 @@
status = [
-'Build (PHP 7.3, basic setup)',
-'Build (PHP 7.3, lowest setup)',
-'Build (PHP 8.0, basic setup)',
-'Build (PHP 8.0, lowest setup)',
-'Build (Go 1.14, PHP 7.3)',
-'Build (Go 1.15, PHP 7.3)',
'Build (Go 1.14, PHP 7.4)',
'Build (Go 1.15, PHP 7.4)',
'Build (Go 1.14, PHP 8.0)',
diff --git a/codecov.yml b/codecov.yml
index e34b27ea..75e92a5f 100644
--- a/codecov.yml
+++ b/codecov.yml
@@ -1,4 +1,12 @@
coverage:
status:
- project: on
- patch: on \ No newline at end of file
+ project:
+ default:
+ target: auto
+ threshold: 0%
+ informational: true
+ patch:
+ default:
+ target: auto
+ threshold: 0%
+ informational: true \ No newline at end of file
diff --git a/composer.json b/composer.json
deleted file mode 100755
index e3017b97..00000000
--- a/composer.json
+++ /dev/null
@@ -1,44 +0,0 @@
-{
- "name": "spiral/roadrunner",
- "type": "server",
- "description": "High-performance PHP application server, load-balancer and process manager written in Golang",
- "license": "MIT",
- "authors": [
- {
- "name": "Anton Titov / Wolfy-J",
- "email": "[email protected]"
- },
- {
- "name": "RoadRunner Community",
- "homepage": "https://github.com/spiral/roadrunner/graphs/contributors"
- }
- ],
- "require": {
- "php": "^7.4 || ^8.0",
- "ext-json": "*",
- "ext-curl": "*",
- "spiral/goridge": "^2.4.2",
- "psr/http-factory": "^1.0.1",
- "psr/http-message": "^1.0.1",
- "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0 || ^5.0.0",
- "laminas/laminas-diactoros": "^1.3.6 || ^2.0",
- "composer/package-versions-deprecated": "^1.8"
- },
- "config": {
- "vendor-dir": "vendor_php"
- },
- "require-dev": {
- "phpstan/phpstan": "~0.12.34"
- },
- "scripts": {
- "analyze": "phpstan analyze -c ./phpstan.neon.dist --no-progress --ansi"
- },
- "autoload": {
- "psr-4": {
- "Spiral\\RoadRunner\\": "src/"
- }
- },
- "bin": [
- "bin/rr"
- ]
-} \ No newline at end of file
diff --git a/go.mod b/go.mod
index b67b0abb..8c4027fb 100755
--- a/go.mod
+++ b/go.mod
@@ -4,20 +4,19 @@ go 1.15
require (
github.com/NYTimes/gziphandler v1.1.1
+ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/fatih/color v1.10.0
+ github.com/go-ole/go-ole v1.2.4 // indirect
github.com/gofiber/fiber/v2 v2.2.3
github.com/golang/mock v1.4.4
github.com/hashicorp/go-multierror v1.1.0
github.com/json-iterator/go v1.1.10
- github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.8.0
github.com/shirou/gopsutil v3.20.11+incompatible
- github.com/sirupsen/logrus v1.6.0
- github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.1
github.com/spiral/endure v1.0.0-beta20
github.com/spiral/errors v1.0.5
- github.com/spiral/goridge/v2 v2.4.6
+ github.com/spiral/goridge/v3 v3.0.0-beta7
github.com/stretchr/testify v1.6.1
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
github.com/yookoala/gofast v0.4.0
diff --git a/go.sum b/go.sum
index de83d29c..4196fb34 100755
--- a/go.sum
+++ b/go.sum
@@ -34,7 +34,6 @@ github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
-github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
@@ -48,11 +47,9 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
-github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
-github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -65,7 +62,6 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
-github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
@@ -74,7 +70,6 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
-github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -82,8 +77,6 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
-github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
-github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
@@ -165,7 +158,6 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
-github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
@@ -236,12 +228,10 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
-github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
-github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI=
@@ -250,16 +240,11 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
-github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54=
-github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
-github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
-github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
-github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
@@ -285,8 +270,6 @@ github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtb
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
-github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8=
-github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
@@ -361,7 +344,6 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
-github.com/shirou/gopsutil v2.20.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto=
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
@@ -386,14 +368,11 @@ github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
-github.com/spf13/cobra v1.0.0 h1:6m/oheQuQ13N9ks4hubMG6BnvwOeaJrqSPLahSnczz8=
-github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
-github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spiral/endure v1.0.0-beta20 h1:QD3EJ6CRLgeo/6trfnlUcQhH3vrK8Hvf9ceDpde+yss=
@@ -401,10 +380,10 @@ github.com/spiral/endure v1.0.0-beta20/go.mod h1:qCU2/4gAItVESzUK0yPExmUTlTcpRLq
github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM=
github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
-github.com/spiral/goridge/v2 v2.4.6 h1:9u/mrxCtOSy0lnumrpPCSOlGBX/Vprid/hFsnzWrd6k=
-github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc=
-github.com/spiral/roadrunner v1.9.0 h1:hQRAqrpUCOujuuuY4dV5hQWjMhwvMnVZmK2mNON/yl4=
-github.com/spiral/roadrunner v1.9.0/go.mod h1:Q1al1YGjs7ZHVkAA7+gUKM0rwk6XWG07G0UjyjjuK+0=
+github.com/spiral/goridge/v3 v3.0.0-beta6 h1:R+MQy93vUWn7zOvdFt8m3WMiTuLoP921IikQpGe9xXo=
+github.com/spiral/goridge/v3 v3.0.0-beta6/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE=
+github.com/spiral/goridge/v3 v3.0.0-beta7 h1:rJmfVFC/clN7XgsONcu185l36cPJ+MfcFkQSifQXFCM=
+github.com/spiral/goridge/v3 v3.0.0-beta7/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
@@ -419,7 +398,6 @@ github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
-github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@@ -428,8 +406,9 @@ github.com/valyala/fasthttp v1.17.0 h1:P8/koH4aSnJ4xbd0cUUFEGQs3jQqIxoDDyRQrUiAk
github.com/valyala/fasthttp v1.17.0/go.mod h1:jjraHZVbKOXftJfsOYoAjaeygpj5hr8ermTRJNroD7A=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
+github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
+github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
-github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yookoala/gofast v0.4.0 h1:dLBjghcsbbZNOEHN8N1X/gh9S6srmJed4WQfG7DlKwo=
github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
@@ -499,13 +478,11 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
@@ -607,6 +584,7 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl
google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I=
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
diff --git a/phpstan.neon.dist b/phpstan.neon.dist
deleted file mode 100644
index b5fec74d..00000000
--- a/phpstan.neon.dist
+++ /dev/null
@@ -1,4 +0,0 @@
-parameters:
- level: 'max'
- paths:
- - src \ No newline at end of file
diff --git a/pipe_factory.go b/pipe_factory.go
index 3e98bd4e..db00c989 100755
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -5,7 +5,7 @@ import (
"os/exec"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
"go.uber.org/multierr"
)
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
index bdb861de..c742f522 100755
--- a/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -105,7 +105,7 @@ func Test_Pipe_Echo(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.Equal(t, "hello", res.String())
}
diff --git a/plugins/checker/tests/plugin_test.go b/plugins/checker/tests/plugin_test.go
index c93b68af..a37fc08e 100644
--- a/plugins/checker/tests/plugin_test.go
+++ b/plugins/checker/tests/plugin_test.go
@@ -13,7 +13,7 @@ import (
"time"
"github.com/spiral/endure"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
"github.com/spiral/roadrunner/v2/interfaces/status"
"github.com/spiral/roadrunner/v2/plugins/checker"
"github.com/spiral/roadrunner/v2/plugins/config"
diff --git a/plugins/gzip/tests/configs/.rr-http-middlewareNotExist.yaml b/plugins/gzip/tests/configs/.rr-http-middlewareNotExist.yaml
index df02c043..3dc5f9df 100644
--- a/plugins/gzip/tests/configs/.rr-http-middlewareNotExist.yaml
+++ b/plugins/gzip/tests/configs/.rr-http-middlewareNotExist.yaml
@@ -1,5 +1,5 @@
server:
- command: "php psr-worker.php"
+ command: "php ../../../tests/psr-worker.php"
user: ""
group: ""
env:
diff --git a/plugins/gzip/tests/configs/.rr-http-withGzip.yaml b/plugins/gzip/tests/configs/.rr-http-withGzip.yaml
index b91a9aad..38fdfe47 100644
--- a/plugins/gzip/tests/configs/.rr-http-withGzip.yaml
+++ b/plugins/gzip/tests/configs/.rr-http-withGzip.yaml
@@ -1,5 +1,5 @@
server:
- command: "php psr-worker.php"
+ command: "php ../../../tests/psr-worker.php"
user: ""
group: ""
env:
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 371cdb91..a6399489 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -183,6 +183,11 @@ func (s *Plugin) Serve() chan error {
s.fcgi = &http.Server{Handler: s}
}
+ // apply middlewares before starting the server
+ if len(s.mdwr) > 0 {
+ s.addMiddlewares()
+ }
+
if s.http != nil {
go func() {
httpErr := s.http.ListenAndServe()
@@ -217,10 +222,6 @@ func (s *Plugin) Serve() chan error {
}()
}
- if len(s.mdwr) > 0 {
- s.addMiddlewares()
- }
-
return errCh
}
diff --git a/plugins/http/tests/configs/.rr-h2c.yaml b/plugins/http/tests/configs/.rr-h2c.yaml
index 316daea9..d1b24338 100644
--- a/plugins/http/tests/configs/.rr-h2c.yaml
+++ b/plugins/http/tests/configs/.rr-h2c.yaml
@@ -20,15 +20,6 @@ http:
maxJobs: 0
allocateTimeout: 60s
destroyTimeout: 60s
-
- ssl:
- port: 8891
- redirect: false
- cert: fixtures/server.crt
- key: fixtures/server.key
- # rootCa: root.crt
- fcgi:
- address: tcp://0.0.0.0:6920
http2:
enabled: true
h2c: true
diff --git a/plugins/http/tests/configs/.rr-ssl-push.yaml b/plugins/http/tests/configs/.rr-ssl-push.yaml
index 90a99192..02de906a 100644
--- a/plugins/http/tests/configs/.rr-ssl-push.yaml
+++ b/plugins/http/tests/configs/.rr-ssl-push.yaml
@@ -26,10 +26,4 @@ http:
redirect: true
cert: fixtures/server.crt
key: fixtures/server.key
- # rootCa: root.crt
- fcgi:
- address: tcp://0.0.0.0:6920
- http2:
- enabled: false
- h2c: false
- maxConcurrentStreams: 128 \ No newline at end of file
+ # rootCa: root.crt \ No newline at end of file
diff --git a/plugins/http/tests/configs/.rr-ssl-redirect.yaml b/plugins/http/tests/configs/.rr-ssl-redirect.yaml
index 1878ba53..0ba1753e 100644
--- a/plugins/http/tests/configs/.rr-ssl-redirect.yaml
+++ b/plugins/http/tests/configs/.rr-ssl-redirect.yaml
@@ -26,10 +26,4 @@ http:
redirect: true
cert: fixtures/server.crt
key: fixtures/server.key
- # rootCa: root.crt
- fcgi:
- address: tcp://0.0.0.0:6920
- http2:
- enabled: false
- h2c: false
- maxConcurrentStreams: 128 \ No newline at end of file
+ # rootCa: root.crt \ No newline at end of file
diff --git a/plugins/http/tests/configs/.rr-ssl.yaml b/plugins/http/tests/configs/.rr-ssl.yaml
index 127c1678..fb54d3fa 100644
--- a/plugins/http/tests/configs/.rr-ssl.yaml
+++ b/plugins/http/tests/configs/.rr-ssl.yaml
@@ -28,7 +28,7 @@ http:
key: fixtures/server.key
# rootCa: root.crt
fcgi:
- address: tcp://0.0.0.0:6920
+ address: tcp://0.0.0.0:16920
http2:
enabled: false
h2c: false
diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go
index 06bf3f5d..f68cd42c 100644
--- a/plugins/http/tests/http_test.go
+++ b/plugins/http/tests/http_test.go
@@ -17,7 +17,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/spiral/endure"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/mocks"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -334,7 +334,7 @@ func sslEcho(t *testing.T) {
}
func fcgiEcho(t *testing.T) {
- fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6920")
+ fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:16920")
fcgiHandler := gofast.NewHandler(
gofast.BasicParamsMap(gofast.BasicSession),
diff --git a/plugins/http/tests/uploads_test.go b/plugins/http/tests/uploads_test.go
index ee244c06..d36d4793 100644
--- a/plugins/http/tests/uploads_test.go
+++ b/plugins/http/tests/uploads_test.go
@@ -269,7 +269,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 200, r.StatusCode)
- fs := fileString(testFile, 5, "application/octet-stream")
+ fs := fileString(testFile, 6, "application/octet-stream")
assert.Equal(t, `{"upload":`+fs+`}`, string(b))
}
@@ -352,7 +352,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 200, r.StatusCode)
- fs := fileString(testFile, 7, "application/octet-stream")
+ fs := fileString(testFile, 8, "application/octet-stream")
assert.Equal(t, `{"upload":`+fs+`}`, string(b))
}
diff --git a/plugins/http/uploads.go b/plugins/http/uploads.go
index 5fddb75d..aeb41591 100644
--- a/plugins/http/uploads.go
+++ b/plugins/http/uploads.go
@@ -18,13 +18,13 @@ const (
UploadErrorNoFile = 4
// UploadErrorNoTmpDir - missing a temporary folder.
- UploadErrorNoTmpDir = 5
+ UploadErrorNoTmpDir = 6
// UploadErrorCantWrite - failed to write file to disk.
- UploadErrorCantWrite = 6
+ UploadErrorCantWrite = 7
// UploadErrorExtension - forbidden file extension.
- UploadErrorExtension = 7
+ UploadErrorExtension = 8
)
// Uploads tree manages uploaded files tree and temporary files.
diff --git a/plugins/informer/tests/informer_test.go b/plugins/informer/tests/informer_test.go
index fbe33a7d..9e21e7ea 100644
--- a/plugins/informer/tests/informer_test.go
+++ b/plugins/informer/tests/informer_test.go
@@ -10,7 +10,7 @@ import (
"time"
"github.com/spiral/endure"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
diff --git a/plugins/metrics/config.go b/plugins/metrics/config.go
index 933b7eb8..9459bc9b 100644
--- a/plugins/metrics/config.go
+++ b/plugins/metrics/config.go
@@ -54,6 +54,8 @@ type Collector struct {
Labels []string `json:"labels"`
// Buckets for histogram metric.
Buckets []float64 `json:"buckets"`
+ // Objectives for the summary opts
+ Objectives map[float64]float64 `json:"objectives"`
}
// register application specific metrics.
@@ -109,10 +111,11 @@ func (c *Config) getCollectors() (map[string]prometheus.Collector, error) {
}
case Summary:
opts := prometheus.SummaryOpts{
- Name: name,
- Namespace: m.Namespace,
- Subsystem: m.Subsystem,
- Help: m.Help,
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ Objectives: m.Objectives,
}
if len(m.Labels) != 0 {
diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go
index f9014c95..57b10aa4 100644
--- a/plugins/metrics/tests/metrics_test.go
+++ b/plugins/metrics/tests/metrics_test.go
@@ -12,7 +12,7 @@ import (
"time"
"github.com/spiral/endure"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/metrics"
diff --git a/plugins/resetter/tests/resetter_test.go b/plugins/resetter/tests/resetter_test.go
index a1873dd4..3bfccf47 100644
--- a/plugins/resetter/tests/resetter_test.go
+++ b/plugins/resetter/tests/resetter_test.go
@@ -10,7 +10,7 @@ import (
"time"
"github.com/spiral/endure"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/resetter"
@@ -19,7 +19,7 @@ import (
"github.com/stretchr/testify/assert"
)
-func TestInformerInit(t *testing.T) {
+func TestResetterInit(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
if err != nil {
t.Fatal(err)
@@ -53,7 +53,7 @@ func TestInformerInit(t *testing.T) {
tt := time.NewTimer(time.Second * 15)
- t.Run("InformerRpcTest", resetterRPCTest)
+ t.Run("ResetterRpcTest", resetterRPCTest)
for {
select {
@@ -94,6 +94,7 @@ func resetterRPCTest(t *testing.T) {
var services []string
err = client.Call("resetter.List", nil, &services)
+ assert.NotNil(t, services)
assert.NoError(t, err)
if services[0] != "resetter.plugin1" {
t.Fatal("no enough services")
diff --git a/plugins/rpc/config_test.go b/plugins/rpc/config_test.go
index 8b1d974a..67532bc8 100755
--- a/plugins/rpc/config_test.go
+++ b/plugins/rpc/config_test.go
@@ -1,6 +1,7 @@
package rpc
import (
+ "runtime"
"testing"
j "github.com/json-iterator/go"
@@ -29,7 +30,11 @@ func TestConfig_Listener(t *testing.T) {
}()
assert.Equal(t, "tcp", ln.Addr().Network())
- assert.Equal(t, "0.0.0.0:18001", ln.Addr().String())
+ if runtime.GOOS == "windows" {
+ assert.Equal(t, "[::]:18001", ln.Addr().String())
+ } else {
+ assert.Equal(t, "0.0.0.0:18001", ln.Addr().String())
+ }
}
func TestConfig_ListenerUnix(t *testing.T) {
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
index 24624d91..c8e63496 100755
--- a/plugins/rpc/plugin.go
+++ b/plugins/rpc/plugin.go
@@ -7,7 +7,7 @@ import (
"github.com/spiral/endure"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
"github.com/spiral/roadrunner/v2/interfaces/log"
rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc"
"github.com/spiral/roadrunner/v2/plugins/config"
diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go
index 854bf097..347e0330 100644
--- a/plugins/rpc/tests/plugin2.go
+++ b/plugins/rpc/tests/plugin2.go
@@ -6,7 +6,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
)
// plugin2 makes a call to the plugin1 via RPC
diff --git a/plugins/server/tests/socket.php b/plugins/server/tests/socket.php
index 143c3ce4..1b76481a 100644
--- a/plugins/server/tests/socket.php
+++ b/plugins/server/tests/socket.php
@@ -6,7 +6,7 @@
use Spiral\Goridge;
use Spiral\RoadRunner;
-require dirname(__DIR__) . "/../../vendor_php/autoload.php";
+require dirname(__DIR__) . "/../../tests/vendor/autoload.php";
$relay = new Goridge\SocketRelay(
"unix.sock",
@@ -16,9 +16,9 @@ $relay = new Goridge\SocketRelay(
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
try {
- $rr->send((string)$in);
+ $rr->send((string)$in->body);
} catch (\Throwable $e) {
$rr->error((string)$e);
}
diff --git a/plugins/server/tests/tcp.php b/plugins/server/tests/tcp.php
index 2d6fb00a..c567c982 100644
--- a/plugins/server/tests/tcp.php
+++ b/plugins/server/tests/tcp.php
@@ -6,15 +6,15 @@
use Spiral\Goridge;
use Spiral\RoadRunner;
-require dirname(__DIR__) . "/../../vendor_php/autoload.php";
+require dirname(__DIR__) . "/../../tests/vendor/autoload.php";
$relay = new Goridge\SocketRelay("localhost", 9999);
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
try {
- $rr->send((string)$in);
+ $rr->send((string)$in->body);
} catch (\Throwable $e) {
$rr->error((string)$e);
}
-} \ No newline at end of file
+}
diff --git a/protocol.go b/protocol.go
index fe1f23a7..ee2d8245 100755
--- a/protocol.go
+++ b/protocol.go
@@ -5,7 +5,7 @@ import (
j "github.com/json-iterator/go"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
)
var json = j.ConfigCompatibleWithStandardLibrary
@@ -19,9 +19,22 @@ type pidCommand struct {
}
func sendControl(rl goridge.Relay, v interface{}) error {
- const op = errors.Op("send control")
+ const op = errors.Op("send control frame")
+ frame := goridge.NewFrame()
+ frame.WriteVersion(goridge.VERSION_1)
+ frame.WriteFlags(goridge.CONTROL)
+
if data, ok := v.([]byte); ok {
- err := rl.Send(data, goridge.PayloadControl|goridge.PayloadRaw)
+ // check if payload no more that 4Gb
+ if uint32(len(data)) > ^uint32(0) {
+ return errors.E(op, errors.Str("payload is more that 4gb"))
+ }
+
+ frame.WritePayloadLen(uint32(len(data)))
+ frame.WritePayload(data)
+ frame.WriteCRC()
+
+ err := rl.Send(frame)
if err != nil {
return errors.E(op, err)
}
@@ -33,7 +46,16 @@ func sendControl(rl goridge.Relay, v interface{}) error {
return errors.E(op, errors.Errorf("invalid payload: %s", err))
}
- return rl.Send(data, goridge.PayloadControl)
+ frame.WritePayloadLen(uint32(len(data)))
+ frame.WritePayload(data)
+ frame.WriteCRC()
+
+ err = rl.Send(frame)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
func fetchPID(rl goridge.Relay) (int64, error) {
@@ -43,16 +65,26 @@ func fetchPID(rl goridge.Relay) (int64, error) {
return 0, errors.E(op, err)
}
- body, p, err := rl.Receive()
+ frameR := goridge.NewFrame()
+ err = rl.Receive(frameR)
+ if !frameR.VerifyCRC() {
+ return 0, errors.E(op, errors.Str("CRC mismatch"))
+ }
if err != nil {
return 0, errors.E(op, err)
}
- if !p.HasFlag(goridge.PayloadControl) {
- return 0, errors.E(op, errors.Str("unexpected response, header is missing"))
+ if frameR == nil {
+ return 0, errors.E(op, errors.Str("nil frame received"))
+ }
+
+ flags := frameR.ReadFlags()
+
+ if flags&(byte(goridge.CONTROL)) == 0 {
+ return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag"))
}
link := &pidCommand{}
- err = json.Unmarshal(body, link)
+ err = json.Unmarshal(frameR.Payload(), link)
if err != nil {
return 0, errors.E(op, err)
}
diff --git a/protocol_test.go b/protocol_test.go
deleted file mode 100755
index 396ce992..00000000
--- a/protocol_test.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package roadrunner
-
-import (
- "testing"
-
- "github.com/pkg/errors"
- "github.com/spiral/goridge/v2"
- "github.com/stretchr/testify/assert"
-)
-
-type relayMock struct {
- error bool
- payload string
-}
-
-func (r *relayMock) Send(data []byte, flags byte) (err error) {
- if r.error {
- return errors.New("send error")
- }
-
- return nil
-}
-
-func (r *relayMock) Receive() (data []byte, p goridge.Prefix, err error) {
- return []byte(r.payload), goridge.NewPrefix().WithFlag(goridge.PayloadControl), nil
-}
-
-func (r *relayMock) Close() error {
- return nil
-}
-
-func Test_Protocol_Errors(t *testing.T) {
- err := sendControl(&relayMock{}, make(chan int))
- assert.Error(t, err)
-}
-
-func Test_Protocol_FetchPID(t *testing.T) {
- pid, err := fetchPID(&relayMock{error: false, payload: "{\"pid\":100}"})
- assert.NoError(t, err)
- assert.Equal(t, int64(100), pid)
-
- _, err = fetchPID(&relayMock{error: true, payload: "{\"pid\":100}"})
- assert.Error(t, err)
-
- _, err = fetchPID(&relayMock{error: false, payload: "{\"pid:100"})
- assert.Error(t, err)
-}
diff --git a/socket_factory.go b/socket_factory.go
index 472e5a05..e517c03f 100755
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -10,7 +10,7 @@ import (
"github.com/shirou/gopsutil/process"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
)
@@ -215,7 +215,7 @@ func (f *SocketFactory) findRelay(w WorkerBase) (*goridge.SocketRelay, error) {
}
// chan to store relay associated with specific pid
-func (f *SocketFactory) attachRelayToPid(pid int64, relay *goridge.SocketRelay) {
+func (f *SocketFactory) attachRelayToPid(pid int64, relay goridge.Relay) {
f.relays.Store(pid, relay)
}
diff --git a/socket_factory_test.go b/socket_factory_test.go
index ab6927bd..bbe8cc31 100755
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -254,7 +254,7 @@ func Test_Tcp_Echo(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.Equal(t, "hello", res.String())
}
@@ -442,7 +442,7 @@ func Test_Unix_Echo(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.Equal(t, "hello", res.String())
}
diff --git a/src/Diactoros/ServerRequestFactory.php b/src/Diactoros/ServerRequestFactory.php
deleted file mode 100644
index 6a42f207..00000000
--- a/src/Diactoros/ServerRequestFactory.php
+++ /dev/null
@@ -1,28 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go
- *
- * @author Wolfy-J
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner\Diactoros;
-
-use Psr\Http\Message\ServerRequestFactoryInterface;
-use Psr\Http\Message\ServerRequestInterface;
-use Laminas\Diactoros\ServerRequest;
-
-final class ServerRequestFactory implements ServerRequestFactoryInterface
-{
- /**
- * @inheritdoc
- *
- * @param array<mixed> $serverParams Array of SAPI parameters with which to seed the generated request instance.
- */
- public function createServerRequest(string $method, $uri, array $serverParams = []): ServerRequestInterface
- {
- $uploadedFiles = [];
- return new ServerRequest($serverParams, $uploadedFiles, $uri, $method);
- }
-}
diff --git a/src/Diactoros/StreamFactory.php b/src/Diactoros/StreamFactory.php
deleted file mode 100644
index 68a77e92..00000000
--- a/src/Diactoros/StreamFactory.php
+++ /dev/null
@@ -1,57 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go
- *
- * @author Wolfy-J
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner\Diactoros;
-
-use RuntimeException;
-use Psr\Http\Message\StreamFactoryInterface;
-use Psr\Http\Message\StreamInterface;
-use Laminas\Diactoros\Stream;
-
-final class StreamFactory implements StreamFactoryInterface
-{
- /**
- * @inheritdoc
- * @throws RuntimeException
- */
- public function createStream(string $content = ''): StreamInterface
- {
- $resource = fopen('php://temp', 'rb+');
-
- if (! \is_resource($resource)) {
- throw new RuntimeException('Cannot create stream');
- }
-
- fwrite($resource, $content);
- rewind($resource);
- return $this->createStreamFromResource($resource);
- }
-
- /**
- * @inheritdoc
- */
- public function createStreamFromFile(string $file, string $mode = 'rb'): StreamInterface
- {
- $resource = fopen($file, $mode);
-
- if (! \is_resource($resource)) {
- throw new RuntimeException('Cannot create stream');
- }
-
- return $this->createStreamFromResource($resource);
- }
-
- /**
- * @inheritdoc
- */
- public function createStreamFromResource($resource): StreamInterface
- {
- return new Stream($resource);
- }
-}
diff --git a/src/Diactoros/UploadedFileFactory.php b/src/Diactoros/UploadedFileFactory.php
deleted file mode 100644
index daa475c1..00000000
--- a/src/Diactoros/UploadedFileFactory.php
+++ /dev/null
@@ -1,36 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go
- *
- * @author Wolfy-J
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner\Diactoros;
-
-use Psr\Http\Message\StreamInterface;
-use Psr\Http\Message\UploadedFileFactoryInterface;
-use Psr\Http\Message\UploadedFileInterface;
-use Laminas\Diactoros\UploadedFile;
-
-final class UploadedFileFactory implements UploadedFileFactoryInterface
-{
- /**
- * @inheritdoc
- */
- public function createUploadedFile(
- StreamInterface $stream,
- int $size = null,
- int $error = \UPLOAD_ERR_OK,
- string $clientFilename = null,
- string $clientMediaType = null
- ): UploadedFileInterface {
- if ($size === null) {
- $size = (int) $stream->getSize();
- }
-
- /** @var resource $stream */
- return new UploadedFile($stream, $size, $error, $clientFilename, $clientMediaType);
- }
-}
diff --git a/src/Exception/MetricException.php b/src/Exception/MetricException.php
deleted file mode 100644
index d5b738b8..00000000
--- a/src/Exception/MetricException.php
+++ /dev/null
@@ -1,17 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner\Exception;
-
-use Spiral\Goridge\Exceptions\RPCException;
-
-class MetricException extends RPCException
-{
-}
diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php
deleted file mode 100644
index 43967893..00000000
--- a/src/Exceptions/RoadRunnerException.php
+++ /dev/null
@@ -1,18 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner\Exceptions;
-
-/**
- * @deprecated use \Spiral\RoadRunner\Exception\RoadRunnerException instead
- */
-class RoadRunnerException extends \RuntimeException
-{
-}
diff --git a/src/HttpClient.php b/src/HttpClient.php
deleted file mode 100644
index 9b9048ca..00000000
--- a/src/HttpClient.php
+++ /dev/null
@@ -1,74 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go
- *
- * @author Alex Bond
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner;
-
-final class HttpClient
-{
- /** @var Worker */
- private $worker;
-
- /**
- * @param Worker $worker
- */
- public function __construct(Worker $worker)
- {
- $this->worker = $worker;
- }
-
- /**
- * @return Worker
- */
- public function getWorker(): Worker
- {
- return $this->worker;
- }
-
- /**
- * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string]
- * or null if termination request or invalid context.
- */
- public function acceptRequest(): ?array
- {
- $body = $this->getWorker()->receive($ctx);
- if (empty($body) && empty($ctx)) {
- // termination request
- return null;
- }
-
- $ctx = json_decode($ctx, true);
- if ($ctx === null) {
- // invalid context
- return null;
- }
-
- return ['ctx' => $ctx, 'body' => $body];
- }
-
- /**
- * Send response to the application server.
- *
- * @param int $status Http status code
- * @param string $body Body of response
- * @param string[][] $headers An associative array of the message's headers. Each
- * key MUST be a header name, and each value MUST be an array of strings
- * for that header.
- */
- public function respond(int $status, string $body, array $headers = []): void
- {
- $sendHeaders = empty($headers)
- ? new \stdClass() // this is required to represent empty header set as map and not as array
- : $headers;
-
- $this->getWorker()->send(
- $body,
- (string) json_encode(['status' => $status, 'headers' => $sendHeaders])
- );
- }
-}
diff --git a/src/Metrics.php b/src/Metrics.php
deleted file mode 100644
index d6b6e1da..00000000
--- a/src/Metrics.php
+++ /dev/null
@@ -1,80 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner;
-
-use Spiral\Goridge\Exceptions\RPCException;
-use Spiral\Goridge\RPC;
-use Spiral\RoadRunner\Exception\MetricException;
-
-/**
- * Application metrics.
- */
-final class Metrics implements MetricsInterface
-{
- /** @var RPC */
- private $rpc;
-
- /**
- * @param RPC $rpc
- */
- public function __construct(RPC $rpc)
- {
- $this->rpc = $rpc;
- }
-
- /**
- * @inheritDoc
- */
- public function add(string $name, float $value, array $labels = []): void
- {
- try {
- $this->rpc->call('metrics.Add', compact('name', 'value', 'labels'));
- } catch (RPCException $e) {
- throw new MetricException($e->getMessage(), $e->getCode(), $e);
- }
- }
-
- /**
- * @inheritDoc
- */
- public function sub(string $name, float $value, array $labels = []): void
- {
- try {
- $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels'));
- } catch (RPCException $e) {
- throw new MetricException($e->getMessage(), $e->getCode(), $e);
- }
- }
-
- /**
- * @inheritDoc
- */
- public function observe(string $name, float $value, array $labels = []): void
- {
- try {
- $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels'));
- } catch (RPCException $e) {
- throw new MetricException($e->getMessage(), $e->getCode(), $e);
- }
- }
-
- /**
- * @inheritDoc
- */
- public function set(string $name, float $value, array $labels = []): void
- {
- try {
- $this->rpc->call('metrics.Set', compact('name', 'value', 'labels'));
- } catch (RPCException $e) {
- throw new MetricException($e->getMessage(), $e->getCode(), $e);
- }
- }
-}
diff --git a/src/MetricsInterface.php b/src/MetricsInterface.php
deleted file mode 100644
index ec2009b0..00000000
--- a/src/MetricsInterface.php
+++ /dev/null
@@ -1,64 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner;
-
-use Spiral\RoadRunner\Exception\MetricException;
-
-interface MetricsInterface
-{
- /**
- * Add collector value. Fallback to appropriate method of related collector.
- *
- * @param string $collector
- * @param float $value
- * @param mixed[] $labels
- *
- * @throws MetricException
- * @return void
- */
- public function add(string $collector, float $value, array $labels = []);
-
- /**
- * Subtract the collector value, only for gauge collector.
- *
- * @param string $collector
- * @param float $value
- * @param mixed[] $labels
- *
- * @throws MetricException
- * @return void
- */
- public function sub(string $collector, float $value, array $labels = []);
-
- /**
- * Observe collector value, only for histogram and summary collectors.
- *
- * @param string $collector
- * @param float $value
- * @param mixed[] $labels
- *
- * @throws MetricException
- * @return void
- */
- public function observe(string $collector, float $value, array $labels = []);
-
- /**
- * Set collector value, only for gauge collector.
- *
- * @param string $collector
- * @param float $value
- * @param mixed[] $labels
- *
- * @throws MetricException
- * @return void
- */
- public function set(string $collector, float $value, array $labels = []);
-}
diff --git a/src/Worker.php b/src/Worker.php
deleted file mode 100644
index d509562e..00000000
--- a/src/Worker.php
+++ /dev/null
@@ -1,178 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go
- *
- * @author Wolfy-J
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner;
-
-use Spiral\Goridge\Exceptions\GoridgeException;
-use Spiral\Goridge\RelayInterface as Relay;
-use Spiral\Goridge\SendPackageRelayInterface;
-use Spiral\RoadRunner\Exception\RoadRunnerException;
-
-/**
- * Accepts connection from RoadRunner server over given Goridge relay.
- *
- * Example:
- *
- * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT));
- * while ($task = $worker->receive($context)) {
- * $worker->send("DONE", json_encode($context));
- * }
- */
-class Worker
-{
- // Send as response context to request worker termination
- public const STOP = '{"stop":true}';
-
- /** @var Relay */
- private $relay;
-
- /**
- * @param Relay $relay
- */
- public function __construct(Relay $relay)
- {
- $this->relay = $relay;
- }
-
- /**
- * Receive packet of information to process, returns null when process must be stopped. Might
- * return Error to wrap error message from server.
- *
- * @param mixed $header
- * @return \Error|null|string
- *
- * @throws GoridgeException
- */
- public function receive(&$header)
- {
- $body = $this->relay->receiveSync($flags);
-
- if ($flags & Relay::PAYLOAD_CONTROL) {
- if ($this->handleControl($body, $header, $flags)) {
- // wait for the next command
- return $this->receive($header);
- }
-
- // no context for the termination.
- $header = null;
-
- // Expect process termination
- return null;
- }
-
- if ($flags & Relay::PAYLOAD_ERROR) {
- return new \Error((string)$body);
- }
-
- return $body;
- }
-
- /**
- * Respond to the server with result of task execution and execution context.
- *
- * Example:
- * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders()));
- *
- * @param string|null $payload
- * @param string|null $header
- */
- public function send(string $payload = null, string $header = null): void
- {
- if (!$this->relay instanceof SendPackageRelayInterface) {
- if ($header === null) {
- $this->relay->send('', Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE);
- } else {
- $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW);
- }
-
- $this->relay->send((string)$payload, Relay::PAYLOAD_RAW);
- } else {
- $this->relay->sendPackage(
- (string)$header,
- Relay::PAYLOAD_CONTROL | ($header === null ? Relay::PAYLOAD_NONE : Relay::PAYLOAD_RAW),
- (string)$payload,
- Relay::PAYLOAD_RAW
- );
- }
- }
-
- /**
- * Respond to the server with an error. Error must be treated as TaskError and might not cause
- * worker destruction.
- *
- * Example:
- *
- * $worker->error("invalid payload");
- *
- * @param string $message
- */
- public function error(string $message): void
- {
- $this->relay->send(
- $message,
- Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR
- );
- }
-
- /**
- * Terminate the process. Server must automatically pass task to the next available process.
- * Worker will receive StopCommand context after calling this method.
- *
- * Attention, you MUST use continue; after invoking this method to let rr to properly
- * stop worker.
- *
- * @throws GoridgeException
- */
- public function stop(): void
- {
- $this->send(null, self::STOP);
- }
-
- /**
- * Handles incoming control command payload and executes it if required.
- *
- * @param string $body
- * @param mixed $header Exported context (if any).
- * @param int $flags
- * @return bool True when continue processing.
- *
- * @throws RoadRunnerException
- */
- private function handleControl(string $body = null, &$header = null, int $flags = 0): bool
- {
- $header = $body;
- if ($body === null || $flags & Relay::PAYLOAD_RAW) {
- // empty or raw prefix
- return true;
- }
-
- $p = json_decode($body, true);
- if ($p === false) {
- throw new RoadRunnerException('invalid task context, JSON payload is expected');
- }
-
- // PID negotiation (socket connections only)
- if (!empty($p['pid'])) {
- $this->relay->send(
- sprintf('{"pid":%s}', getmypid()),
- Relay::PAYLOAD_CONTROL
- );
- }
-
- // termination request
- if (!empty($p['stop'])) {
- return false;
- }
-
- // parsed header
- $header = $p;
-
- return true;
- }
-}
diff --git a/static_pool.go b/static_pool.go
index b626a499..fbb2e5e8 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -162,7 +162,8 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
}
// worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+ // TODO careful with string(rsp.Context)
+ if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
sw.State().Set(StateInvalid)
err = sw.Stop(bCtx)
if err != nil {
diff --git a/static_pool_test.go b/static_pool_test.go
index 747f26c4..33799c40 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -82,7 +82,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.Equal(t, "hello", res.String())
}
@@ -106,7 +106,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.Equal(t, "hello", res.String())
}
@@ -129,7 +129,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, res)
- assert.Nil(t, res.Body)
+ assert.Empty(t, res.Body)
assert.NotNil(t, res.Context)
assert.Equal(t, "world", string(res.Context))
@@ -214,7 +214,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.Equal(t, "hello", res.String())
assert.Equal(t, runtime.NumCPU(), len(p.Workers()))
@@ -293,7 +293,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.NotEqual(t, lastPID, string(res.Body))
lastPID = string(res.Body)
@@ -332,7 +332,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.NotEqual(t, lastPID, string(res.Body))
lastPID = string(res.Body)
@@ -372,7 +372,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.NotEqual(t, lastPID, string(res.Body))
lastPID = string(res.Body)
diff --git a/supervisor_test.go b/supervisor_test.go
index 08ea356d..d5d7d04c 100644
--- a/supervisor_test.go
+++ b/supervisor_test.go
@@ -142,7 +142,8 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
})
assert.NoError(t, err)
- assert.Empty(t, resp)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
time.Sleep(time.Second * 1)
// should be the same pid
diff --git a/sync_worker.go b/sync_worker.go
index c4d9aa9d..94a804a7 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -1,6 +1,7 @@
package roadrunner
import (
+ "bytes"
"context"
"time"
@@ -8,7 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/util"
"go.uber.org/multierr"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
)
var EmptyPayload = Payload{}
@@ -134,38 +135,60 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload,
func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
const op = errors.Op("exec payload")
- // two things; todo: merge
- // NO CONTROL HERE
- if err := sendControl(tw.w.Relay(), p.Context); err != nil {
- return EmptyPayload, errors.E(op, err, "header error")
- }
- if err := tw.w.Relay().Send(p.Body, 0); err != nil {
- return EmptyPayload, errors.E(op, err, "sender error")
+ frame := goridge.NewFrame()
+ frame.WriteVersion(goridge.VERSION_1)
+ // can be 0 here
+
+ buf := new(bytes.Buffer)
+ buf.Write(p.Context)
+ buf.Write(p.Body)
+
+ // Context offset
+ frame.WriteOptions(uint32(len(p.Context)))
+ frame.WritePayloadLen(uint32(buf.Len()))
+ frame.WritePayload(buf.Bytes())
+
+ frame.WriteCRC()
+
+ // empty and free the buffer
+ buf.Truncate(0)
+
+ err := tw.Relay().Send(frame)
+ if err != nil {
+ return EmptyPayload, err
}
- var pr goridge.Prefix
- rsp := Payload{}
+ frameR := goridge.NewFrame()
- var err error
- if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.E(op, err, "WorkerProcess error")
+ err = tw.w.Relay().Receive(frameR)
+ if err != nil {
+ return EmptyPayload, errors.E(op, err)
+ }
+ if frameR == nil {
+ return EmptyPayload, errors.E(op, errors.Str("nil frame received"))
}
- if !pr.HasFlag(goridge.PayloadControl) {
- return EmptyPayload, errors.E(op, errors.Str("malformed WorkerProcess response"))
+ if !frameR.VerifyCRC() {
+ return EmptyPayload, errors.E(op, errors.Str("failed to verify CRC"))
}
- if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(rsp.Context)))
+ flags := frameR.ReadFlags()
+
+ if flags&byte(goridge.ERROR) != byte(0) {
+ return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
}
- // add streaming support :)
- if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.E(op, err, "WorkerProcess error")
+ options := frameR.ReadOptions()
+ if len(options) != 1 {
+ return EmptyPayload, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
}
- return rsp, nil
+ payload := Payload{}
+ payload.Context = frameR.Payload()[:options[0]]
+ payload.Body = frameR.Payload()[options[0]:]
+
+ return payload, nil
}
func (tw *syncWorker) String() string {
diff --git a/sync_worker_test.go b/sync_worker_test.go
index 30b5bae8..0ef1e0cd 100755
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -40,7 +40,7 @@ func Test_Echo(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.Equal(t, "hello", res.String())
}
@@ -148,7 +148,7 @@ func Test_Echo_Slow(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Empty(t, res.Context)
assert.Equal(t, "hello", res.String())
}
diff --git a/tests/broken.php b/tests/broken.php
index 42b4e7c2..1f869b2d 100644
--- a/tests/broken.php
+++ b/tests/broken.php
@@ -8,7 +8,7 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
echo undefined_function();
- $rr->send((string)$in);
+ $rr->send((string)$in->body, null);
}
diff --git a/tests/client.php b/tests/client.php
index 835b1c6c..c00cece1 100644
--- a/tests/client.php
+++ b/tests/client.php
@@ -3,7 +3,7 @@
use Spiral\Goridge;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
if (count($argv) < 3) {
die("need 2 arguments");
diff --git a/tests/composer.json b/tests/composer.json
new file mode 100644
index 00000000..24702c37
--- /dev/null
+++ b/tests/composer.json
@@ -0,0 +1,12 @@
+{
+ "minimum-stability": "beta",
+ "require": {
+ "nyholm/psr7": "^1.3",
+ "spiral/goridge": "^3.0@beta"
+ },
+ "autoload": {
+ "psr-4": {
+ "Spiral\\RoadRunner\\": "src/"
+ }
+ }
+}
diff --git a/tests/delay.php b/tests/delay.php
index bf9ecc12..f0435b05 100644
--- a/tests/delay.php
+++ b/tests/delay.php
@@ -8,9 +8,9 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
try {
- usleep($in * 1000);
+ usleep($in->body * 1000);
$rr->send('');
} catch (\Throwable $e) {
$rr->error((string)$e);
diff --git a/tests/echo.php b/tests/echo.php
index 1570e3df..83eec92e 100644
--- a/tests/echo.php
+++ b/tests/echo.php
@@ -8,9 +8,9 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
try {
- $rr->send((string)$in);
+ $rr->send((string)$in->body);
} catch (\Throwable $e) {
$rr->error((string)$e);
}
diff --git a/tests/error.php b/tests/error.php
index 8e1c8d0d..c77e6817 100644
--- a/tests/error.php
+++ b/tests/error.php
@@ -8,6 +8,6 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
- $rr->error((string)$in);
+while ($in = $rr->waitPayload()) {
+ $rr->error((string)$in->body);
}
diff --git a/tests/head.php b/tests/head.php
index 88ebd3f2..3c57258f 100644
--- a/tests/head.php
+++ b/tests/head.php
@@ -8,9 +8,9 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
try {
- $rr->send("", (string)$ctx);
+ $rr->send("", (string)$in->header);
} catch (\Throwable $e) {
$rr->error((string)$e);
}
diff --git a/tests/http/client.php b/tests/http/client.php
index 9f21b273..ad5cce24 100644
--- a/tests/http/client.php
+++ b/tests/http/client.php
@@ -4,7 +4,7 @@ use Spiral\Goridge;
use Spiral\RoadRunner;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/../vendor_php/autoload.php";
+require dirname(__DIR__) . "/vendor/autoload.php";
if (count($argv) < 3) {
die("need 2 arguments");
@@ -33,12 +33,18 @@ switch ($goridge) {
die("invalid protocol selection");
}
-$psr7 = new RoadRunner\PSR7Client(new RoadRunner\Worker($relay));
+$psr7 = new RoadRunner\Http\PSR7Worker(
+ new RoadRunner\Worker($relay),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory()
+);
+
require_once sprintf("%s/%s.php", __DIR__, $test);
-while ($req = $psr7->acceptRequest()) {
+while ($req = $psr7->waitRequest()) {
try {
- $psr7->respond(handleRequest($req, new \Zend\Diactoros\Response()));
+ $psr7->respond(handleRequest($req, new \Nyholm\Psr7\Response()));
} catch (\Throwable $e) {
$psr7->getWorker()->error((string)$e);
}
diff --git a/tests/http/slow-client.php b/tests/http/slow-client.php
index 4d3963d7..731232f7 100644
--- a/tests/http/slow-client.php
+++ b/tests/http/slow-client.php
@@ -4,13 +4,13 @@ use Spiral\Goridge;
use Spiral\RoadRunner;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/../vendor_php/autoload.php";
+require dirname(__DIR__) . "/vendor/autoload.php";
if (count($argv) < 3) {
die("need 2 arguments");
}
-list($test, $goridge, $bootDelay) = [$argv[1], $argv[2], $argv[3]];
+[$test, $goridge, $bootDelay] = [$argv[1], $argv[2], $argv[3]];
usleep($bootDelay * 1000);
switch ($goridge) {
@@ -34,13 +34,19 @@ switch ($goridge) {
die("invalid protocol selection");
}
-$psr7 = new RoadRunner\PSR7Client(new RoadRunner\Worker($relay));
+$psr7 = new RoadRunner\Http\PSR7Worker(
+ new RoadRunner\Worker($relay),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory()
+);
+
require_once sprintf("%s/%s.php", __DIR__, $test);
-while ($req = $psr7->acceptRequest()) {
+while ($req = $psr7->waitRequest()) {
try {
- $psr7->respond(handleRequest($req, new \Zend\Diactoros\Response()));
+ $psr7->respond(handleRequest($req, new \Nyholm\Psr7\Response()));
} catch (\Throwable $e) {
- $psr7->getWorker()->error((string)$e);
+ $psr7->getWorker()->error((string) $e);
}
}
diff --git a/tests/memleak.php b/tests/memleak.php
index b78a76c0..9a5376f0 100644
--- a/tests/memleak.php
+++ b/tests/memleak.php
@@ -5,11 +5,11 @@ declare(strict_types=1);
use Spiral\Goridge\StreamRelay;
use Spiral\RoadRunner\Worker as RoadRunner;
-require dirname(__DIR__) . "/vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
$mem = '';
-while($rr->receive($ctx)){
+while($rr->waitPayload()){
$mem .= str_repeat(" ", 1024*1024);
$rr->send("");
-} \ No newline at end of file
+}
diff --git a/tests/pid.php b/tests/pid.php
index bf10a025..f8b2515d 100644
--- a/tests/pid.php
+++ b/tests/pid.php
@@ -8,7 +8,7 @@
$rr = new RoadRunner\Worker($relay);
- while ($in = $rr->receive($ctx)) {
+ while ($in = $rr->waitPayload()) {
try {
$rr->send((string)getmypid());
} catch (\Throwable $e) {
diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php
new file mode 100644
index 00000000..e6df81ad
--- /dev/null
+++ b/tests/psr-worker-bench.php
@@ -0,0 +1,28 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+ini_set('display_errors', 'stderr');
+require __DIR__ . "/vendor/autoload.php";
+
+$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$psr7 = new RoadRunner\Http\PSR7Worker(
+ $worker,
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory()
+);
+
+while ($req = $psr7->waitRequest()) {
+ try {
+ $resp = new \Nyholm\Psr7\Response();
+ $resp->getBody()->write("hello world");
+
+ $psr7->respond($resp);
+ } catch (\Throwable $e) {
+ $psr7->getWorker()->error((string)$e);
+ }
+}
diff --git a/plugins/gzip/tests/psr-worker.php b/tests/psr-worker.php
index ed936bde..db53eee2 100644
--- a/plugins/gzip/tests/psr-worker.php
+++ b/tests/psr-worker.php
@@ -6,18 +6,23 @@ use Spiral\Goridge;
use Spiral\RoadRunner;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/../../vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
-$psr7 = new RoadRunner\PSR7Client($worker);
+$psr7 = new RoadRunner\Http\PSR7Worker(
+ $worker,
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory()
+);
-while ($req = $psr7->acceptRequest()) {
+while ($req = $psr7->waitRequest()) {
try {
- $resp = new \Zend\Diactoros\Response();
+ $resp = new \Nyholm\Psr7\Response();
$resp->getBody()->write(str_repeat("hello world", 1000));
$psr7->respond($resp);
} catch (\Throwable $e) {
$psr7->getWorker()->error((string)$e);
}
-} \ No newline at end of file
+}
diff --git a/tests/sleep.php b/tests/sleep.php
index b3ea8235..e34a6834 100644
--- a/tests/sleep.php
+++ b/tests/sleep.php
@@ -5,11 +5,11 @@ declare(strict_types=1);
use Spiral\Goridge\StreamRelay;
use Spiral\RoadRunner\Worker as RoadRunner;
-require dirname(__DIR__) . "/vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
-while($rr->receive($ctx)){
+while($rr->waitPayload()){
sleep(3);
$rr->send("");
-} \ No newline at end of file
+}
diff --git a/tests/slow-client.php b/tests/slow-client.php
index ece0a439..7737f0b1 100644
--- a/tests/slow-client.php
+++ b/tests/slow-client.php
@@ -3,7 +3,7 @@
use Spiral\Goridge;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
if (count($argv) < 3) {
die("need 2 arguments");
diff --git a/tests/slow-destroy.php b/tests/slow-destroy.php
index e2a01af2..900bb68a 100644
--- a/tests/slow-destroy.php
+++ b/tests/slow-destroy.php
@@ -3,7 +3,7 @@
use Spiral\Goridge;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
if (count($argv) < 3) {
die("need 2 arguments");
diff --git a/tests/slow-pid.php b/tests/slow-pid.php
index 747e7e86..3660cb40 100644
--- a/tests/slow-pid.php
+++ b/tests/slow-pid.php
@@ -8,7 +8,7 @@
$rr = new RoadRunner\Worker($relay);
- while ($in = $rr->receive($ctx)) {
+ while ($in = $rr->waitPayload()) {
try {
sleep(1);
$rr->send((string)getmypid());
diff --git a/tests/src/Environment.php b/tests/src/Environment.php
new file mode 100644
index 00000000..9b306063
--- /dev/null
+++ b/tests/src/Environment.php
@@ -0,0 +1,82 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\RoadRunner\Exception\EnvironmentException;
+
+class Environment implements EnvironmentInterface
+{
+ /** @var array */
+ private array $env;
+
+ /**
+ * @param array $env
+ */
+ public function __construct(array $env)
+ {
+ $this->env = $env;
+ }
+
+ /**
+ * Returns worker mode assigned to the PHP process.
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getMode(): string
+ {
+ return $this->getValue('RR_MODE');
+ }
+
+ /**
+ * Address worker should be connected to (or pipes).
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getRelayAddress(): string
+ {
+ return $this->getValue('RR_RELAY');
+ }
+
+ /**
+ * RPC address.
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getRPCAddress(): string
+ {
+ return $this->getValue('RR_RPC');
+ }
+
+ /**
+ * @param string $name
+ * @return string
+ * @throws EnvironmentException
+ */
+ private function getValue(string $name): string
+ {
+ if (!isset($this->env[$name])) {
+ throw new EnvironmentException(sprintf("Missing environment value `%s`", $name));
+ }
+
+ return (string) $this->env[$name];
+ }
+
+ /**
+ * @return EnvironmentInterface
+ */
+ public static function fromGlobals(): EnvironmentInterface
+ {
+ return new static(array_merge($_SERVER, $_ENV));
+ }
+}
diff --git a/tests/src/EnvironmentInterface.php b/tests/src/EnvironmentInterface.php
new file mode 100644
index 00000000..bc0ae043
--- /dev/null
+++ b/tests/src/EnvironmentInterface.php
@@ -0,0 +1,43 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\RoadRunner\Exception\EnvironmentException;
+
+/**
+ * Provides base values to configure roadrunner worker.
+ */
+interface EnvironmentInterface
+{
+ /**
+ * Returns worker mode assigned to the PHP process.
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getMode(): string;
+
+ /**
+ * Address worker should be connected to (or pipes).
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getRelayAddress(): string;
+
+ /**
+ * RPC address.
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getRPCAddress(): string;
+}
diff --git a/tests/src/Exception/EnvironmentException.php b/tests/src/Exception/EnvironmentException.php
new file mode 100644
index 00000000..227507c5
--- /dev/null
+++ b/tests/src/Exception/EnvironmentException.php
@@ -0,0 +1,16 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Exception;
+
+class EnvironmentException extends RoadRunnerException
+{
+
+}
diff --git a/src/Exception/RoadRunnerException.php b/tests/src/Exception/RoadRunnerException.php
index f83c3dd4..2329370c 100644
--- a/src/Exception/RoadRunnerException.php
+++ b/tests/src/Exception/RoadRunnerException.php
@@ -1,14 +1,15 @@
<?php
/**
- * High-performance PHP process supervisor and load balancer written in Go
+ * High-performance PHP process supervisor and load balancer written in Go.
*
- * @author Wolfy-J
+ * @author Wolfy-J
*/
+
declare(strict_types=1);
namespace Spiral\RoadRunner\Exception;
-class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException
+class RoadRunnerException extends \RuntimeException
{
}
diff --git a/tests/src/Http/HttpWorker.php b/tests/src/Http/HttpWorker.php
new file mode 100644
index 00000000..ba045d87
--- /dev/null
+++ b/tests/src/Http/HttpWorker.php
@@ -0,0 +1,105 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Alex Bond
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Http;
+
+use Spiral\RoadRunner\WorkerInterface;
+
+class HttpWorker
+{
+ /** @var WorkerInterface */
+ private WorkerInterface $worker;
+
+ /**
+ * @param WorkerInterface $worker
+ */
+ public function __construct(WorkerInterface $worker)
+ {
+ $this->worker = $worker;
+ }
+
+ /**
+ * @return WorkerInterface
+ */
+ public function getWorker(): WorkerInterface
+ {
+ return $this->worker;
+ }
+
+ /**
+ * Wait for incoming http request.
+ *
+ * @return Request|null
+ */
+ public function waitRequest(): ?Request
+ {
+ $payload = $this->getWorker()->waitPayload();
+ if (empty($payload->body) && empty($payload->header)) {
+ // termination request
+ return null;
+ }
+
+ $request = new Request();
+ $request->body = $payload->body;
+
+ $context = json_decode($payload->header, true);
+ if ($context === null) {
+ // invalid context
+ return null;
+ }
+
+ $this->hydrateRequest($request, $context);
+
+ return $request;
+ }
+
+ /**
+ * Send response to the application server.
+ *
+ * @param int $status Http status code
+ * @param string $body Body of response
+ * @param string[][] $headers An associative array of the message's headers. Each
+ * key MUST be a header name, and each value MUST be an array of strings
+ * for that header.
+ */
+ public function respond(int $status, string $body, array $headers = []): void
+ {
+ if ($headers === []) {
+ // this is required to represent empty header set as map and not as array
+ $headers = new \stdClass();
+ }
+
+ $this->getWorker()->send(
+ $body,
+ (string) json_encode(['status' => $status, 'headers' => $headers])
+ );
+ }
+
+ /**
+ * @param Request $request
+ * @param array $context
+ */
+ private function hydrateRequest(Request $request, array $context): void
+ {
+ $request->remoteAddr = $context['remoteAddr'];
+ $request->protocol = $context['protocol'];
+ $request->method = $context['method'];
+ $request->uri = $context['uri'];
+ $request->attributes = $context['attributes'] ?? [];
+ $request->headers = $context['headers'];
+ $request->cookies = $context['cookies'] ?? [];
+ $request->uploads = $context['uploads'] ?? [];
+
+ $request->query = [];
+ parse_str($context['rawQuery'], $request->query);
+
+ // indicates that body was parsed
+ $request->parsed = $context['parsed'];
+ }
+}
diff --git a/src/PSR7Client.php b/tests/src/Http/PSR7Worker.php
index 777dd891..f16c4847 100644
--- a/src/PSR7Client.php
+++ b/tests/src/Http/PSR7Worker.php
@@ -7,7 +7,7 @@
*/
declare(strict_types=1);
-namespace Spiral\RoadRunner;
+namespace Spiral\RoadRunner\Http;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestFactoryInterface;
@@ -15,100 +15,64 @@ use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Psr\Http\Message\UploadedFileFactoryInterface;
use Psr\Http\Message\UploadedFileInterface;
+use Spiral\RoadRunner\WorkerInterface;
/**
* Manages PSR-7 request and response.
*/
-class PSR7Client
+class PSR7Worker
{
- /** @var HttpClient */
- private $httpClient;
-
- /** @var ServerRequestFactoryInterface */
- private $requestFactory;
-
- /** @var StreamFactoryInterface */
- private $streamFactory;
-
- /** @var UploadedFileFactoryInterface */
- private $uploadsFactory;
+ private HttpWorker $httpWorker;
+ private ServerRequestFactoryInterface $requestFactory;
+ private StreamFactoryInterface $streamFactory;
+ private UploadedFileFactoryInterface $uploadsFactory;
/** @var mixed[] */
- private $originalServer = [];
+ private array $originalServer = [];
/** @var string[] Valid values for HTTP protocol version */
- private static $allowedVersions = ['1.0', '1.1', '2',];
+ private static array $allowedVersions = ['1.0', '1.1', '2',];
/**
- * @param Worker $worker
- * @param ServerRequestFactoryInterface|null $requestFactory
- * @param StreamFactoryInterface|null $streamFactory
- * @param UploadedFileFactoryInterface|null $uploadsFactory
+ * @param WorkerInterface $worker
+ * @param ServerRequestFactoryInterface $requestFactory
+ * @param StreamFactoryInterface $streamFactory
+ * @param UploadedFileFactoryInterface $uploadsFactory
*/
public function __construct(
- Worker $worker,
- ServerRequestFactoryInterface $requestFactory = null,
- StreamFactoryInterface $streamFactory = null,
- UploadedFileFactoryInterface $uploadsFactory = null
+ WorkerInterface $worker,
+ ServerRequestFactoryInterface $requestFactory,
+ StreamFactoryInterface $streamFactory,
+ UploadedFileFactoryInterface $uploadsFactory
) {
- $this->httpClient = new HttpClient($worker);
- $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory();
- $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory();
- $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory();
+ $this->httpWorker = new HttpWorker($worker);
+ $this->requestFactory = $requestFactory;
+ $this->streamFactory = $streamFactory;
+ $this->uploadsFactory = $uploadsFactory;
$this->originalServer = $_SERVER;
}
/**
- * @return Worker
+ * @return WorkerInterface
*/
- public function getWorker(): Worker
+ public function getWorker(): WorkerInterface
{
- return $this->httpClient->getWorker();
+ return $this->httpWorker->getWorker();
}
/**
* @return ServerRequestInterface|null
*/
- public function acceptRequest(): ?ServerRequestInterface
+ public function waitRequest(): ?ServerRequestInterface
{
- $rawRequest = $this->httpClient->acceptRequest();
- if ($rawRequest === null) {
+ $httpRequest = $this->httpWorker->waitRequest();
+ if ($httpRequest === null) {
return null;
}
- $_SERVER = $this->configureServer($rawRequest['ctx']);
-
- $request = $this->requestFactory->createServerRequest(
- $rawRequest['ctx']['method'],
- $rawRequest['ctx']['uri'],
- $_SERVER
- );
-
- parse_str($rawRequest['ctx']['rawQuery'], $query);
-
- $request = $request
- ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol']))
- ->withCookieParams($rawRequest['ctx']['cookies'])
- ->withQueryParams($query)
- ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads']));
-
- foreach ($rawRequest['ctx']['attributes'] as $name => $value) {
- $request = $request->withAttribute($name, $value);
- }
-
- foreach ($rawRequest['ctx']['headers'] as $name => $value) {
- $request = $request->withHeader($name, $value);
- }
-
- if ($rawRequest['ctx']['parsed']) {
- return $request->withParsedBody(json_decode($rawRequest['body'], true));
- }
-
- if ($rawRequest['body'] !== null) {
- return $request->withBody($this->streamFactory->createStream($rawRequest['body']));
- }
+ $_SERVER = $this->configureServer($httpRequest);
- return $request;
+ return $this->mapRequest($httpRequest, $_SERVER);
}
/**
@@ -118,7 +82,7 @@ class PSR7Client
*/
public function respond(ResponseInterface $response): void
{
- $this->httpClient->respond(
+ $this->httpWorker->respond(
$response->getStatusCode(),
$response->getBody()->__toString(),
$response->getHeaders()
@@ -129,21 +93,21 @@ class PSR7Client
* Returns altered copy of _SERVER variable. Sets ip-address,
* request-time and other values.
*
- * @param mixed[] $ctx
+ * @param Request $request
* @return mixed[]
*/
- protected function configureServer(array $ctx): array
+ protected function configureServer(Request $request): array
{
$server = $this->originalServer;
- $server['REQUEST_URI'] = $ctx['uri'];
+ $server['REQUEST_URI'] = $request->uri;
$server['REQUEST_TIME'] = time();
$server['REQUEST_TIME_FLOAT'] = microtime(true);
- $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1';
- $server['REQUEST_METHOD'] = $ctx['method'];
+ $server['REMOTE_ADDR'] = $request->getRemoteAddr();
+ $server['REQUEST_METHOD'] = $request->method;
$server['HTTP_USER_AGENT'] = '';
- foreach ($ctx['headers'] as $key => $value) {
+ foreach ($request->headers as $key => $value) {
$key = strtoupper(str_replace('-', '_', $key));
if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) {
$server[$key] = implode(', ', $value);
@@ -156,18 +120,52 @@ class PSR7Client
}
/**
+ * @param Request $httpRequest
+ * @param array $server
+ * @return ServerRequestInterface
+ */
+ protected function mapRequest(Request $httpRequest, array $server): ServerRequestInterface
+ {
+ $request = $this->requestFactory->createServerRequest(
+ $httpRequest->method,
+ $httpRequest->uri,
+ $_SERVER
+ );
+
+
+ $request = $request
+ ->withProtocolVersion(static::fetchProtocolVersion($httpRequest->protocol))
+ ->withCookieParams($httpRequest->cookies)
+ ->withQueryParams($httpRequest->query)
+ ->withUploadedFiles($this->wrapUploads($httpRequest->uploads));
+
+ foreach ($httpRequest->attributes as $name => $value) {
+ $request = $request->withAttribute($name, $value);
+ }
+
+ foreach ($httpRequest->headers as $name => $value) {
+ $request = $request->withHeader($name, $value);
+ }
+
+ if ($httpRequest->parsed) {
+ return $request->withParsedBody($httpRequest->getParsedBody());
+ }
+
+ if ($httpRequest->body !== null) {
+ return $request->withBody($this->streamFactory->createStream($httpRequest->body));
+ }
+
+ return $request;
+ }
+
+ /**
* Wraps all uploaded files with UploadedFile.
*
* @param array[] $files
- *
* @return UploadedFileInterface[]|mixed[]
*/
- private function wrapUploads($files): array
+ protected function wrapUploads(array $files): array
{
- if (empty($files)) {
- return [];
- }
-
$result = [];
foreach ($files as $index => $f) {
if (!isset($f['name'])) {
diff --git a/tests/src/Http/Request.php b/tests/src/Http/Request.php
new file mode 100644
index 00000000..ef67e28d
--- /dev/null
+++ b/tests/src/Http/Request.php
@@ -0,0 +1,48 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Http;
+
+final class Request
+{
+
+ public string $remoteAddr;
+ public string $protocol;
+ public string $method;
+ public string $uri;
+ public array $headers;
+ public array $cookies;
+ public array $uploads;
+ public array $attributes;
+ public array $query;
+ public ?string $body;
+ public bool $parsed;
+
+ /**
+ * @return string
+ */
+ public function getRemoteAddr(): string
+ {
+ return $this->attributes['ipAddress'] ?? $this->remoteAddr ?? '127.0.0.1';
+ }
+
+ /**
+ * @return array|null
+ */
+ public function getParsedBody(): ?array
+ {
+ if ($this->parsed) {
+ return json_decode($this->body, true);
+ }
+
+ return null;
+ }
+}
diff --git a/tests/src/Payload.php b/tests/src/Payload.php
new file mode 100644
index 00000000..c9b8c198
--- /dev/null
+++ b/tests/src/Payload.php
@@ -0,0 +1,43 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+/**
+ * Class Payload
+ *
+ * @package Spiral\RoadRunner
+ */
+final class Payload
+{
+ /**
+ * Execution payload (binary).
+ *
+ * @var string|null
+ */
+ public ?string $body;
+
+ /**
+ * Execution context (binary).
+ *
+ * @var string|null
+ */
+ public ?string $header;
+
+ /**
+ * @param string|null $body
+ * @param string|null $header
+ */
+ public function __construct(?string $body, ?string $header = null)
+ {
+ $this->body = $body;
+ $this->header = $header;
+ }
+}
diff --git a/tests/src/Worker.php b/tests/src/Worker.php
new file mode 100644
index 00000000..53cf6cef
--- /dev/null
+++ b/tests/src/Worker.php
@@ -0,0 +1,162 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\Goridge\Exception\GoridgeException;
+use Spiral\Goridge\Frame;
+use Spiral\Goridge\RelayInterface as Relay;
+use Spiral\RoadRunner\Exception\EnvironmentException;
+use Spiral\RoadRunner\Exception\RoadRunnerException;
+
+/**
+ * Accepts connection from RoadRunner server over given Goridge relay.
+ *
+ * $worker = Worker::create();
+ * while ($p = $worker->waitPayload()) {
+ * $worker->send(new Payload("DONE", json_encode($context)));
+ * }
+ */
+class Worker implements WorkerInterface
+{
+ // Request graceful worker termination.
+ private const STOP_REQUEST = '{"stop":true}';
+
+ private Relay $relay;
+
+ /**
+ * @param Relay $relay
+ */
+ public function __construct(Relay $relay)
+ {
+ $this->relay = $relay;
+ }
+
+ /**
+ * Wait for incoming payload from the server. Must return null when worker stopped.
+ *
+ * @return Payload|null
+ * @throws GoridgeException
+ * @throws RoadRunnerException
+ */
+ public function waitPayload(): ?Payload
+ {
+ $frame = $this->relay->waitFrame();
+
+ if ($frame->hasFlag(Frame::CONTROL)) {
+ $continue = $this->handleControl($frame->payload);
+
+ if ($continue) {
+ return $this->waitPayload();
+ } else {
+ return null;
+ }
+ }
+
+ return new Payload(
+ substr($frame->payload, $frame->options[0]),
+ substr($frame->payload, 0, $frame->options[0])
+ );
+ }
+
+ /**
+ * Respond to the server with the processing result.
+ *
+ * @param Payload $payload
+ * @throws GoridgeException
+ */
+ public function respond(Payload $payload): void
+ {
+ $this->send($payload->body, $payload->header);
+ }
+
+ /**
+ * Respond to the server with an error. Error must be treated as TaskError and might not cause
+ * worker destruction.
+ *
+ * Example:
+ *
+ * $worker->error("invalid payload");
+ *
+ * @param string $message
+ */
+ public function error(string $message): void
+ {
+ $this->relay->send(new Frame($message, [], Frame::ERROR));
+ }
+
+ /**
+ * Terminate the process. Server must automatically pass task to the next available process.
+ * Worker will receive StopCommand context after calling this method.
+ *
+ * Attention, you MUST use continue; after invoking this method to let rr to properly
+ * stop worker.
+ *
+ * @throws GoridgeException
+ */
+ public function stop(): void
+ {
+ $this->send("", self::STOP_REQUEST);
+ }
+
+ /**
+ * @param string $body
+ * @param string|null $context
+ * @throws GoridgeException
+ */
+ public function send(string $body, string $context = null): void
+ {
+ $this->relay->send(new Frame(
+ (string) $context . $body,
+ [strlen((string) $context)]
+ ));
+ }
+
+ /**
+ * Return true if continue.
+ *
+ * @param string $header
+ * @return bool
+ *
+ * @throws RoadRunnerException
+ */
+ private function handleControl(string $header): bool
+ {
+ $command = json_decode($header, true);
+ if ($command === false) {
+ throw new RoadRunnerException('Invalid task header, JSON payload is expected');
+ }
+
+ switch (true) {
+ case !empty($command['pid']):
+ $this->relay->send(new Frame(sprintf('{"pid":%s}', getmypid()), [], Frame::CONTROL));
+ return true;
+
+ case !empty($command['stop']):
+ return false;
+
+ default:
+ throw new RoadRunnerException('Invalid task header, undefined control package');
+ }
+ }
+
+ /**
+ * Create Worker using global environment configuration.
+ *
+ * @return WorkerInterface
+ * @throws EnvironmentException
+ */
+ public static function create(): WorkerInterface
+ {
+ $env = Environment::fromGlobals();
+
+ return new static(\Spiral\Goridge\Relay::create($env->getRelayAddress()));
+ }
+}
diff --git a/tests/src/WorkerInterface.php b/tests/src/WorkerInterface.php
new file mode 100644
index 00000000..bf0b6e06
--- /dev/null
+++ b/tests/src/WorkerInterface.php
@@ -0,0 +1,55 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\Goridge\Exception\GoridgeException;
+use Spiral\RoadRunner\Exception\RoadRunnerException;
+
+interface WorkerInterface
+{
+ /**
+ * Wait for incoming payload from the server. Must return null when worker stopped.
+ *
+ * @return Payload|null
+ * @throws GoridgeException
+ * @throws RoadRunnerException
+ */
+ public function waitPayload(): ?Payload;
+
+ /**
+ * Respond to the server with the processing result.
+ *
+ * @param Payload $payload
+ * @throws GoridgeException
+ */
+ public function respond(Payload $payload): void;
+
+ /**
+ * Respond to the server with an error. Error must be treated as TaskError and might not cause
+ * worker destruction.
+ *
+ * Example:
+ *
+ * $worker->error("invalid payload");
+ *
+ * @param string $error
+ * @throws GoridgeException
+ */
+ public function error(string $error): void;
+
+ /**
+ * Terminate the process. Server must automatically pass task to the next available process.
+ * Worker will receive stop command after calling this method.
+ *
+ * Attention, you MUST use continue; after invoking this method to let rr to properly stop worker.
+ */
+ public function stop(): void;
+}
diff --git a/tests/stop.php b/tests/stop.php
index 0100ad0f..f83d3f29 100644
--- a/tests/stop.php
+++ b/tests/stop.php
@@ -9,7 +9,7 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
$used = false;
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
try {
if ($used) {
// kill on second attempt
diff --git a/util/isolate.go b/util/isolate.go
index 2bdb9d6c..0ea1dff3 100755
--- a/util/isolate.go
+++ b/util/isolate.go
@@ -26,12 +26,12 @@ func ExecuteFromUser(cmd *exec.Cmd, u string) error {
return errors.E(op, err)
}
- usrI32, err := strconv.Atoi(usr.Uid)
+ usrI32, err := strconv.ParseInt(usr.Uid, 10, 32)
if err != nil {
return errors.E(op, err)
}
- grI32, err := strconv.Atoi(usr.Gid)
+ grI32, err := strconv.ParseInt(usr.Gid, 10, 32)
if err != nil {
return errors.E(op, err)
}
diff --git a/worker.go b/worker.go
index 402e9b90..d860b3af 100755
--- a/worker.go
+++ b/worker.go
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
+ "io"
"os"
"os/exec"
"strconv"
@@ -14,7 +15,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/util"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
"go.uber.org/multierr"
)
@@ -57,6 +58,13 @@ type WorkerEvent struct {
Payload interface{}
}
+var pool = sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, 10240)
+ return &buf
+ },
+}
+
type WorkerBase interface {
fmt.Stringer
@@ -130,10 +138,12 @@ type WorkerProcess struct {
endState *os.ProcessState
// ensures than only one execution can be run at once.
- mu sync.Mutex
+ mu sync.RWMutex
// communication bus with underlying process.
relay goridge.Relay
+ rd io.Reader
+ stop chan struct{}
}
// InitBaseWorker creates new WorkerProcess over given exec.cmd.
@@ -147,13 +157,19 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
cmd: cmd,
state: newState(StateInactive),
stderr: new(bytes.Buffer),
+ stop: make(chan struct{}, 1),
}
+ w.rd, w.cmd.Stderr = io.Pipe()
+
// small buffer optimization
// at this point we know, that stderr will contain huge messages
- w.stderr.Grow(1024)
+ w.stderr.Grow(10240)
+
+ go func() {
+ w.watch()
+ }()
- w.cmd.Stderr = w
return w, nil
}
@@ -222,6 +238,7 @@ func (w *WorkerProcess) Start() error {
func (w *WorkerProcess) Wait() error {
const op = errors.Op("worker process wait")
err := multierr.Combine(w.cmd.Wait())
+
// at this point according to the documentation (see cmd.Wait comment)
// if worker finishes with an error, message will be written to the stderr first
// and then w.cmd.Wait return an error
@@ -229,10 +246,14 @@ func (w *WorkerProcess) Wait() error {
if err != nil {
w.state.Set(StateErrored)
+ w.mu.RLock()
// if process return code > 0, here will be an error from stderr (if presents)
if w.stderr.Len() > 0 {
err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
+ // stop the stderr buffer
+ w.stop <- struct{}{}
}
+ w.mu.RUnlock()
return multierr.Append(err, w.closeRelay())
}
@@ -299,16 +320,48 @@ func (w *WorkerProcess) Kill() error {
return nil
}
+// put the pointer, to not allocate new slice
+// but erase it len and then return back
+func (w *WorkerProcess) put(data *[]byte) {
+ *data = (*data)[:0]
+ *data = (*data)[:cap(*data)]
+
+ pool.Put(data)
+}
+
+// get pointer to the byte slice
+func (w *WorkerProcess) get() *[]byte {
+ return pool.Get().(*[]byte)
+}
+
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
// needed. The return value n is the length of pool; errBuffer is always nil.
-func (w *WorkerProcess) Write(p []byte) (int, error) {
- w.mu.Lock()
- defer w.mu.Unlock()
- // clean all previous messages in the stderr
- w.stderr.Truncate(0)
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p})
- // write new message
- w.stderr.Write(p)
-
- return len(p), nil
+func (w *WorkerProcess) watch() {
+ go func() {
+ for {
+ select {
+ case <-w.stop:
+ buf := w.get()
+ // read the last data
+ n, _ := w.rd.Read(*buf)
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write((*buf)[:n])
+ w.mu.Unlock()
+ w.put(buf)
+ return
+ default:
+ // read the max 10kb of stderr per one read
+ buf := w.get()
+ n, _ := w.rd.Read(*buf)
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write((*buf)[:n])
+ w.mu.Unlock()
+ w.put(buf)
+ }
+ }
+ }()
}
diff --git a/worker_watcher.go b/worker_watcher.go
index 8bc147d0..f8fb67a9 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -11,40 +11,46 @@ import (
)
type Stack struct {
- workers []WorkerBase
- mutex sync.RWMutex
- destroy bool
+ workers []WorkerBase
+ mutex sync.RWMutex
+ destroy bool
+ actualNumOfWorkers int64
}
func NewWorkersStack() *Stack {
+ w := runtime.NumCPU()
return &Stack{
- workers: make([]WorkerBase, 0, runtime.NumCPU()),
+ workers: make([]WorkerBase, 0, w),
+ actualNumOfWorkers: 0,
}
}
func (stack *Stack) Reset() {
stack.mutex.Lock()
defer stack.mutex.Unlock()
-
+ stack.actualNumOfWorkers = 0
stack.workers = nil
}
+// Push worker back to the stack
+// If stack in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w WorkerBase) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
+ stack.actualNumOfWorkers++
stack.workers = append(stack.workers, w)
}
func (stack *Stack) IsEmpty() bool {
stack.mutex.Lock()
defer stack.mutex.Unlock()
-
return len(stack.workers) == 0
}
func (stack *Stack) Pop() (WorkerBase, bool) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
+
// do not release new stack
if stack.destroy {
return nil, true
@@ -54,12 +60,83 @@ func (stack *Stack) Pop() (WorkerBase, bool) {
return nil, false
}
+ // move worker
w := stack.workers[len(stack.workers)-1]
stack.workers = stack.workers[:len(stack.workers)-1]
-
+ stack.actualNumOfWorkers--
return w, false
}
+func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ for i := 0; i < len(stack.workers); i++ {
+ // worker in the stack, reallocating
+ if stack.workers[i].Pid() == pid {
+ stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
+ stack.actualNumOfWorkers--
+ // worker found and removed
+ return true
+ }
+ }
+ // no worker with such ID
+ return false
+}
+
+// Workers return copy of the workers in the stack
+func (stack *Stack) Workers() []WorkerBase {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ workersCopy := make([]WorkerBase, 0, 1)
+ // copy
+ for _, v := range stack.workers {
+ sw := v.(SyncWorker)
+ workersCopy = append(workersCopy, sw)
+ }
+
+ return workersCopy
+}
+
+func (stack *Stack) isDestroying() bool {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ return stack.destroy
+}
+
+// we also have to give a chance to pool to Push worker (return it)
+func (stack *Stack) Destroy(ctx context.Context) {
+ stack.mutex.Lock()
+ stack.destroy = true
+ stack.mutex.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 100)
+ for {
+ select {
+ case <-tt.C:
+ stack.mutex.Lock()
+ // that might be one of the workers is working
+ if len(stack.workers) != int(stack.actualNumOfWorkers) {
+ stack.mutex.Unlock()
+ continue
+ }
+ stack.mutex.Unlock()
+ // unnecessary mutex, but
+ // just to make sure. All stack at this moment are in the stack
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ stack.mutex.Lock()
+ for i := 0; i < len(stack.workers); i++ {
+ // set state for the stack in the stack (unused at the moment)
+ stack.workers[i].State().Set(StateDestroyed)
+ }
+ stack.mutex.Unlock()
+ tt.Stop()
+ // clear
+ stack.Reset()
+ return
+ }
+ }
+}
+
type WorkerWatcher interface {
// AddToWatch used to add stack to wait its state
AddToWatch(workers []WorkerBase) error
@@ -151,7 +228,6 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
if w == nil {
continue
}
- ww.ReduceWorkersCount()
return w, nil
case <-ctx.Done():
return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
@@ -159,7 +235,6 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
}
}
- ww.ReduceWorkersCount()
return w, nil
}
@@ -184,91 +259,41 @@ func (ww *workerWatcher) AllocateNew() error {
}
func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error {
- ww.stack.mutex.Lock()
+ ww.mutex.Lock()
+ defer ww.mutex.Unlock()
+
const op = errors.Op("remove worker")
- defer ww.stack.mutex.Unlock()
pid := wb.Pid()
- for i := 0; i < len(ww.stack.workers); i++ {
- if ww.stack.workers[i].Pid() == pid {
- // found in the stack
- // remove worker
- ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.ReduceWorkersCount()
-
- wb.State().Set(StateInvalid)
- err := wb.Kill()
- if err != nil {
- return errors.E(op, err)
- }
- break
+
+ if ww.stack.FindAndRemoveByPid(pid) {
+ wb.State().Set(StateInvalid)
+ err := wb.Kill()
+ if err != nil {
+ return errors.E(op, err)
}
+ return nil
}
- // worker currently handle request, set state Remove
+
wb.State().Set(StateRemove)
return nil
}
// O(1) operation
func (ww *workerWatcher) PushWorker(w WorkerBase) {
- ww.IncreaseWorkersCount()
- ww.stack.Push(w)
-}
-
-func (ww *workerWatcher) ReduceWorkersCount() {
- ww.mutex.Lock()
- ww.actualNumWorkers--
- ww.mutex.Unlock()
-}
-func (ww *workerWatcher) IncreaseWorkersCount() {
ww.mutex.Lock()
- ww.actualNumWorkers++
- ww.mutex.Unlock()
+ defer ww.mutex.Unlock()
+ ww.stack.Push(w)
}
// Destroy all underlying stack (but let them to complete the task)
func (ww *workerWatcher) Destroy(ctx context.Context) {
- ww.stack.mutex.Lock()
- ww.stack.destroy = true
- ww.stack.mutex.Unlock()
-
- tt := time.NewTicker(time.Millisecond * 100)
- for {
- select {
- case <-tt.C:
- ww.stack.mutex.Lock()
- if len(ww.stack.workers) != int(ww.actualNumWorkers) {
- ww.stack.mutex.Unlock()
- continue
- }
- ww.stack.mutex.Unlock()
- // unnecessary mutex, but
- // just to make sure. All stack at this moment are in the stack
- // Pop operation is blocked, push can't be done, since it's not possible to pop
- ww.stack.mutex.Lock()
- for i := 0; i < len(ww.stack.workers); i++ {
- // set state for the stack in the stack (unused at the moment)
- ww.stack.workers[i].State().Set(StateDestroyed)
- }
- ww.stack.mutex.Unlock()
- tt.Stop()
- // clear
- ww.stack.Reset()
- return
- }
- }
+ // destroy stack, we don't use ww mutex here, since we should be able to push worker
+ ww.stack.Destroy(ctx)
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) WorkersList() []WorkerBase {
- ww.stack.mutex.Lock()
- defer ww.stack.mutex.Unlock()
- workersCopy := make([]WorkerBase, 0, 1)
- for _, v := range ww.stack.workers {
- sw := v.(SyncWorker)
- workersCopy = append(workersCopy, sw)
- }
-
- return workersCopy
+ return ww.stack.Workers()
}
func (ww *workerWatcher) wait(w WorkerBase) {
@@ -287,37 +312,13 @@ func (ww *workerWatcher) wait(w WorkerBase) {
return
}
- pid := w.Pid()
- ww.stack.mutex.Lock()
- for i := 0; i < len(ww.stack.workers); i++ {
- // worker in the stack, reallocating
- if ww.stack.workers[i].Pid() == pid {
- ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.ReduceWorkersCount()
- ww.stack.mutex.Unlock()
-
- err = ww.AllocateNew()
- if err != nil {
- ww.events.Push(PoolEvent{
- Event: EventPoolError,
- Payload: errors.E(op, err),
- })
- }
-
- return
- }
- }
-
- ww.stack.mutex.Unlock()
-
- // worker not in the stack (not returned), forget and allocate new
+ _ = ww.stack.FindAndRemoveByPid(w.Pid())
err = ww.AllocateNew()
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
Payload: errors.E(op, err),
})
- return
}
}