diff options
76 files changed, 1067 insertions, 1078 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8cc0291c..070350a0 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,55 +5,16 @@ on: pull_request: jobs: - php: - name: Build (PHP ${{ matrix.php }}, ${{ matrix.setup }} setup) - runs-on: ubuntu-20.04 - timeout-minutes: 6 - strategy: - fail-fast: false - matrix: - php: [ '7.4', '8.0' ] - setup: [ basic, lowest ] - steps: - - name: Set up PHP ${{ matrix.php }} - uses: shivammathur/setup-php@v2 # action page: <https://github.com/shivammathur/setup-php> - with: - php-version: ${{ matrix.php }} - - - name: Check out code - uses: actions/checkout@v2 - - - name: Syntax check only (lint) - run: find ./src/ ./tests/ -name "*.php" -print0 | xargs -0 -n1 -P8 php -l - - - name: Get Composer Cache Directory - id: composer-cache - run: echo "::set-output name=dir::$(composer config cache-files-dir)" - - - name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer> - uses: actions/cache@v2 - with: - path: ${{ steps.composer-cache.outputs.dir }} - key: ${{ runner.os }}-composer-${{ matrix.setup }}-${{ hashFiles('**/composer.json') }} - restore-keys: ${{ runner.os }}-composer- - - - name: Install lowest Composer dependencies - if: matrix.setup == 'lowest' - run: composer update --prefer-dist --no-progress --prefer-lowest --ansi - - - name: Install basic Composer dependencies - if: matrix.setup == 'basic' - run: composer update --prefer-dist --no-progress --ansi - golang: - name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}) - runs-on: ubuntu-20.04 - timeout-minutes: 10 + name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}}) + runs-on: ${{ matrix.os }} + timeout-minutes: 15 strategy: fail-fast: false matrix: php: [ '7.4', '8.0' ] go: [ '1.14', '1.15' ] + os: [ ubuntu-latest, windows-latest, macos-latest ] steps: - name: Set up Go ${{ matrix.go }} uses: actions/setup-go@v2 # action page: <https://github.com/actions/setup-go> @@ -64,15 +25,18 @@ jobs: uses: shivammathur/setup-php@v2 # action page: <https://github.com/shivammathur/setup-php> with: php-version: ${{ matrix.php }} + extensions: sockets - name: Check out code uses: actions/checkout@v2 - name: Get Composer Cache Directory + if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} id: composer-cache run: echo "::set-output name=dir::$(composer config cache-files-dir)" - name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer> + if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} uses: actions/cache@v2 with: path: ${{ steps.composer-cache.outputs.dir }} @@ -80,7 +44,7 @@ jobs: restore-keys: ${{ runner.os }}-composer- - name: Install Composer dependencies - run: composer update --prefer-dist --no-progress --ansi + run: cd tests && composer update --prefer-dist --no-progress --ansi - name: Init Go modules Cache # Docs: <https://git.io/JfAKn#go---modules> uses: actions/cache@v2 @@ -92,7 +56,28 @@ jobs: - name: Install Go dependencies run: go mod download - - name: Run golang tests + - name: Run golang tests on Windows without codecov + if: ${{ matrix.os == 'windows-latest' }} + run: | + go test -v -race -cover -tags=debug . + go test -v -race -cover -tags=debug ./plugins/rpc + go test -v -race -cover -tags=debug ./plugins/rpc/tests + go test -v -race -cover -tags=debug ./plugins/config/tests + go test -v -race -cover -tags=debug ./plugins/logger/tests + go test -v -race -cover -tags=debug ./plugins/server/tests + go test -v -race -cover -tags=debug ./plugins/metrics/tests + go test -v -race -cover -tags=debug ./plugins/informer/tests + go test -v -race -cover -tags=debug ./plugins/resetter/tests + go test -v -race -cover -tags=debug ./plugins/http/attributes + go test -v -race -cover -tags=debug ./plugins/http/tests + go test -v -race -cover -tags=debug ./plugins/gzip/tests + go test -v -race -cover -tags=debug ./plugins/static/tests + go test -v -race -cover -tags=debug ./plugins/static + go test -v -race -cover -tags=debug ./plugins/headers/tests + go test -v -race -cover -tags=debug ./plugins/checker/tests + + - name: Run golang tests on Linux and MacOS + if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} run: | mkdir ./coverage-ci go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/lib.txt -covermode=atomic . @@ -114,6 +99,7 @@ jobs: cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt - uses: codecov/codecov-action@v1 # Docs: <https://github.com/codecov/codecov-action> + if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} with: token: ${{ secrets.CODECOV_TOKEN }} file: ./coverage-ci/summary.txt @@ -21,5 +21,4 @@ vendor vendor_php builds/ tests/vendor/ -.rr-sample.yaml -psr-worker.php +.rr-sample.yaml
\ No newline at end of file @@ -24,20 +24,21 @@ uninstall: ## Uninstall locally installed RR rm -f /usr/local/bin/rr test: ## Run application tests - test -d ./vendor_php || composer update --prefer-dist --ansi - go test -v -race -cover - go test -v -race -cover ./util - go test -v -race -cover ./service - go test -v -race -cover ./service/env - go test -v -race -cover ./service/rpc - go test -v -race -cover ./service/http - go test -v -race -cover ./service/static - go test -v -race -cover ./service/limit - go test -v -race -cover ./service/headers - go test -v -race -cover ./service/metrics - go test -v -race -cover ./service/health - go test -v -race -cover ./service/gzip - go test -v -race -cover ./service/reload + go test -v -race -cover -tags=debug -covermode=atomic . + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/rpc/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/config/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/logger/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/server/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/metrics/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/informer/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/resetter/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/http/attributes + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/http/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/gzip/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/static/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/headers/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/checker/tests lint: ## Run application linters go fmt ./... @@ -1,10 +1,4 @@ status = [ -'Build (PHP 7.3, basic setup)', -'Build (PHP 7.3, lowest setup)', -'Build (PHP 8.0, basic setup)', -'Build (PHP 8.0, lowest setup)', -'Build (Go 1.14, PHP 7.3)', -'Build (Go 1.15, PHP 7.3)', 'Build (Go 1.14, PHP 7.4)', 'Build (Go 1.15, PHP 7.4)', 'Build (Go 1.14, PHP 8.0)', diff --git a/composer.json b/composer.json deleted file mode 100755 index e3017b97..00000000 --- a/composer.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "name": "spiral/roadrunner", - "type": "server", - "description": "High-performance PHP application server, load-balancer and process manager written in Golang", - "license": "MIT", - "authors": [ - { - "name": "Anton Titov / Wolfy-J", - "email": "[email protected]" - }, - { - "name": "RoadRunner Community", - "homepage": "https://github.com/spiral/roadrunner/graphs/contributors" - } - ], - "require": { - "php": "^7.4 || ^8.0", - "ext-json": "*", - "ext-curl": "*", - "spiral/goridge": "^2.4.2", - "psr/http-factory": "^1.0.1", - "psr/http-message": "^1.0.1", - "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0 || ^5.0.0", - "laminas/laminas-diactoros": "^1.3.6 || ^2.0", - "composer/package-versions-deprecated": "^1.8" - }, - "config": { - "vendor-dir": "vendor_php" - }, - "require-dev": { - "phpstan/phpstan": "~0.12.34" - }, - "scripts": { - "analyze": "phpstan analyze -c ./phpstan.neon.dist --no-progress --ansi" - }, - "autoload": { - "psr-4": { - "Spiral\\RoadRunner\\": "src/" - } - }, - "bin": [ - "bin/rr" - ] -}
\ No newline at end of file @@ -4,21 +4,19 @@ go 1.15 require ( github.com/NYTimes/gziphandler v1.1.1 + github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/fatih/color v1.10.0 + github.com/go-ole/go-ole v1.2.4 // indirect github.com/gofiber/fiber/v2 v2.2.3 github.com/golang/mock v1.4.4 github.com/hashicorp/go-multierror v1.1.0 github.com/json-iterator/go v1.1.10 - github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.8.0 github.com/shirou/gopsutil v3.20.11+incompatible - github.com/sirupsen/logrus v1.6.0 - github.com/spf13/cobra v1.0.0 github.com/spf13/viper v1.7.1 github.com/spiral/endure v1.0.0-beta20 github.com/spiral/errors v1.0.5 - github.com/spiral/goridge/v2 v2.4.6 - github.com/spiral/roadrunner v1.9.0 + github.com/spiral/goridge/v3 v3.0.0-beta7 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a github.com/yookoala/gofast v0.4.0 @@ -34,7 +34,6 @@ github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= -github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= @@ -48,11 +47,9 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= -github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= -github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc= github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -65,7 +62,6 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= -github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -74,7 +70,6 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7 github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= -github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -82,8 +77,6 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= -github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= @@ -165,7 +158,6 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51 github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= -github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= @@ -236,12 +228,10 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= -github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= -github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI= @@ -250,16 +240,11 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= -github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54= -github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI= -github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg= github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= @@ -285,8 +270,6 @@ github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtb github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= -github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8= -github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= @@ -361,7 +344,6 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/shirou/gopsutil v2.20.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto= github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= @@ -386,14 +368,11 @@ github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= -github.com/spf13/cobra v1.0.0 h1:6m/oheQuQ13N9ks4hubMG6BnvwOeaJrqSPLahSnczz8= -github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= -github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spiral/endure v1.0.0-beta20 h1:QD3EJ6CRLgeo/6trfnlUcQhH3vrK8Hvf9ceDpde+yss= @@ -401,10 +380,10 @@ github.com/spiral/endure v1.0.0-beta20/go.mod h1:qCU2/4gAItVESzUK0yPExmUTlTcpRLq github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM= github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= -github.com/spiral/goridge/v2 v2.4.6 h1:9u/mrxCtOSy0lnumrpPCSOlGBX/Vprid/hFsnzWrd6k= -github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc= -github.com/spiral/roadrunner v1.9.0 h1:hQRAqrpUCOujuuuY4dV5hQWjMhwvMnVZmK2mNON/yl4= -github.com/spiral/roadrunner v1.9.0/go.mod h1:Q1al1YGjs7ZHVkAA7+gUKM0rwk6XWG07G0UjyjjuK+0= +github.com/spiral/goridge/v3 v3.0.0-beta6 h1:R+MQy93vUWn7zOvdFt8m3WMiTuLoP921IikQpGe9xXo= +github.com/spiral/goridge/v3 v3.0.0-beta6/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= +github.com/spiral/goridge/v3 v3.0.0-beta7 h1:rJmfVFC/clN7XgsONcu185l36cPJ+MfcFkQSifQXFCM= +github.com/spiral/goridge/v3 v3.0.0-beta7/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= @@ -419,7 +398,6 @@ github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -428,8 +406,9 @@ github.com/valyala/fasthttp v1.17.0 h1:P8/koH4aSnJ4xbd0cUUFEGQs3jQqIxoDDyRQrUiAk github.com/valyala/fasthttp v1.17.0/go.mod h1:jjraHZVbKOXftJfsOYoAjaeygpj5hr8ermTRJNroD7A= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= +github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= -github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yookoala/gofast v0.4.0 h1:dLBjghcsbbZNOEHN8N1X/gh9S6srmJed4WQfG7DlKwo= github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -499,13 +478,11 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= @@ -607,6 +584,7 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= diff --git a/phpstan.neon.dist b/phpstan.neon.dist deleted file mode 100644 index b5fec74d..00000000 --- a/phpstan.neon.dist +++ /dev/null @@ -1,4 +0,0 @@ -parameters: - level: 'max' - paths: - - src
\ No newline at end of file diff --git a/pipe_factory.go b/pipe_factory.go index 3e98bd4e..db00c989 100755 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -5,7 +5,7 @@ import ( "os/exec" "github.com/spiral/errors" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" "go.uber.org/multierr" ) diff --git a/pipe_factory_test.go b/pipe_factory_test.go index bdb861de..c742f522 100755 --- a/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -105,7 +105,7 @@ func Test_Pipe_Echo(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.Equal(t, "hello", res.String()) } diff --git a/plugins/checker/tests/plugin_test.go b/plugins/checker/tests/plugin_test.go index c93b68af..a37fc08e 100644 --- a/plugins/checker/tests/plugin_test.go +++ b/plugins/checker/tests/plugin_test.go @@ -13,7 +13,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" "github.com/spiral/roadrunner/v2/interfaces/status" "github.com/spiral/roadrunner/v2/plugins/checker" "github.com/spiral/roadrunner/v2/plugins/config" diff --git a/plugins/gzip/tests/configs/.rr-http-middlewareNotExist.yaml b/plugins/gzip/tests/configs/.rr-http-middlewareNotExist.yaml index df02c043..3dc5f9df 100644 --- a/plugins/gzip/tests/configs/.rr-http-middlewareNotExist.yaml +++ b/plugins/gzip/tests/configs/.rr-http-middlewareNotExist.yaml @@ -1,5 +1,5 @@ server: - command: "php psr-worker.php" + command: "php ../../../tests/psr-worker.php" user: "" group: "" env: diff --git a/plugins/gzip/tests/configs/.rr-http-withGzip.yaml b/plugins/gzip/tests/configs/.rr-http-withGzip.yaml index b91a9aad..38fdfe47 100644 --- a/plugins/gzip/tests/configs/.rr-http-withGzip.yaml +++ b/plugins/gzip/tests/configs/.rr-http-withGzip.yaml @@ -1,5 +1,5 @@ server: - command: "php psr-worker.php" + command: "php ../../../tests/psr-worker.php" user: "" group: "" env: diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 371cdb91..a6399489 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -183,6 +183,11 @@ func (s *Plugin) Serve() chan error { s.fcgi = &http.Server{Handler: s} } + // apply middlewares before starting the server + if len(s.mdwr) > 0 { + s.addMiddlewares() + } + if s.http != nil { go func() { httpErr := s.http.ListenAndServe() @@ -217,10 +222,6 @@ func (s *Plugin) Serve() chan error { }() } - if len(s.mdwr) > 0 { - s.addMiddlewares() - } - return errCh } diff --git a/plugins/http/tests/configs/.rr-h2c.yaml b/plugins/http/tests/configs/.rr-h2c.yaml index 316daea9..d1b24338 100644 --- a/plugins/http/tests/configs/.rr-h2c.yaml +++ b/plugins/http/tests/configs/.rr-h2c.yaml @@ -20,15 +20,6 @@ http: maxJobs: 0 allocateTimeout: 60s destroyTimeout: 60s - - ssl: - port: 8891 - redirect: false - cert: fixtures/server.crt - key: fixtures/server.key - # rootCa: root.crt - fcgi: - address: tcp://0.0.0.0:6920 http2: enabled: true h2c: true diff --git a/plugins/http/tests/configs/.rr-ssl-push.yaml b/plugins/http/tests/configs/.rr-ssl-push.yaml index 90a99192..02de906a 100644 --- a/plugins/http/tests/configs/.rr-ssl-push.yaml +++ b/plugins/http/tests/configs/.rr-ssl-push.yaml @@ -26,10 +26,4 @@ http: redirect: true cert: fixtures/server.crt key: fixtures/server.key - # rootCa: root.crt - fcgi: - address: tcp://0.0.0.0:6920 - http2: - enabled: false - h2c: false - maxConcurrentStreams: 128
\ No newline at end of file + # rootCa: root.crt
\ No newline at end of file diff --git a/plugins/http/tests/configs/.rr-ssl-redirect.yaml b/plugins/http/tests/configs/.rr-ssl-redirect.yaml index 1878ba53..0ba1753e 100644 --- a/plugins/http/tests/configs/.rr-ssl-redirect.yaml +++ b/plugins/http/tests/configs/.rr-ssl-redirect.yaml @@ -26,10 +26,4 @@ http: redirect: true cert: fixtures/server.crt key: fixtures/server.key - # rootCa: root.crt - fcgi: - address: tcp://0.0.0.0:6920 - http2: - enabled: false - h2c: false - maxConcurrentStreams: 128
\ No newline at end of file + # rootCa: root.crt
\ No newline at end of file diff --git a/plugins/http/tests/configs/.rr-ssl.yaml b/plugins/http/tests/configs/.rr-ssl.yaml index 127c1678..fb54d3fa 100644 --- a/plugins/http/tests/configs/.rr-ssl.yaml +++ b/plugins/http/tests/configs/.rr-ssl.yaml @@ -28,7 +28,7 @@ http: key: fixtures/server.key # rootCa: root.crt fcgi: - address: tcp://0.0.0.0:6920 + address: tcp://0.0.0.0:16920 http2: enabled: false h2c: false diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go index 06bf3f5d..f68cd42c 100644 --- a/plugins/http/tests/http_test.go +++ b/plugins/http/tests/http_test.go @@ -17,7 +17,7 @@ import ( "github.com/golang/mock/gomock" "github.com/spiral/endure" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/mocks" "github.com/spiral/roadrunner/v2/plugins/config" @@ -334,7 +334,7 @@ func sslEcho(t *testing.T) { } func fcgiEcho(t *testing.T) { - fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6920") + fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:16920") fcgiHandler := gofast.NewHandler( gofast.BasicParamsMap(gofast.BasicSession), diff --git a/plugins/http/tests/uploads_test.go b/plugins/http/tests/uploads_test.go index ee244c06..d36d4793 100644 --- a/plugins/http/tests/uploads_test.go +++ b/plugins/http/tests/uploads_test.go @@ -269,7 +269,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 200, r.StatusCode) - fs := fileString(testFile, 5, "application/octet-stream") + fs := fileString(testFile, 6, "application/octet-stream") assert.Equal(t, `{"upload":`+fs+`}`, string(b)) } @@ -352,7 +352,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 200, r.StatusCode) - fs := fileString(testFile, 7, "application/octet-stream") + fs := fileString(testFile, 8, "application/octet-stream") assert.Equal(t, `{"upload":`+fs+`}`, string(b)) } diff --git a/plugins/http/uploads.go b/plugins/http/uploads.go index 5fddb75d..aeb41591 100644 --- a/plugins/http/uploads.go +++ b/plugins/http/uploads.go @@ -18,13 +18,13 @@ const ( UploadErrorNoFile = 4 // UploadErrorNoTmpDir - missing a temporary folder. - UploadErrorNoTmpDir = 5 + UploadErrorNoTmpDir = 6 // UploadErrorCantWrite - failed to write file to disk. - UploadErrorCantWrite = 6 + UploadErrorCantWrite = 7 // UploadErrorExtension - forbidden file extension. - UploadErrorExtension = 7 + UploadErrorExtension = 8 ) // Uploads tree manages uploaded files tree and temporary files. diff --git a/plugins/informer/tests/informer_test.go b/plugins/informer/tests/informer_test.go index fbe33a7d..9e21e7ea 100644 --- a/plugins/informer/tests/informer_test.go +++ b/plugins/informer/tests/informer_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go index f9014c95..57b10aa4 100644 --- a/plugins/metrics/tests/metrics_test.go +++ b/plugins/metrics/tests/metrics_test.go @@ -12,7 +12,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/metrics" diff --git a/plugins/resetter/tests/resetter_test.go b/plugins/resetter/tests/resetter_test.go index a1873dd4..3bfccf47 100644 --- a/plugins/resetter/tests/resetter_test.go +++ b/plugins/resetter/tests/resetter_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" @@ -19,7 +19,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestInformerInit(t *testing.T) { +func TestResetterInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) if err != nil { t.Fatal(err) @@ -53,7 +53,7 @@ func TestInformerInit(t *testing.T) { tt := time.NewTimer(time.Second * 15) - t.Run("InformerRpcTest", resetterRPCTest) + t.Run("ResetterRpcTest", resetterRPCTest) for { select { @@ -94,6 +94,7 @@ func resetterRPCTest(t *testing.T) { var services []string err = client.Call("resetter.List", nil, &services) + assert.NotNil(t, services) assert.NoError(t, err) if services[0] != "resetter.plugin1" { t.Fatal("no enough services") diff --git a/plugins/rpc/config_test.go b/plugins/rpc/config_test.go index 8b1d974a..67532bc8 100755 --- a/plugins/rpc/config_test.go +++ b/plugins/rpc/config_test.go @@ -1,6 +1,7 @@ package rpc import ( + "runtime" "testing" j "github.com/json-iterator/go" @@ -29,7 +30,11 @@ func TestConfig_Listener(t *testing.T) { }() assert.Equal(t, "tcp", ln.Addr().Network()) - assert.Equal(t, "0.0.0.0:18001", ln.Addr().String()) + if runtime.GOOS == "windows" { + assert.Equal(t, "[::]:18001", ln.Addr().String()) + } else { + assert.Equal(t, "0.0.0.0:18001", ln.Addr().String()) + } } func TestConfig_ListenerUnix(t *testing.T) { diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index 24624d91..c8e63496 100755 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -7,7 +7,7 @@ import ( "github.com/spiral/endure" "github.com/spiral/errors" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" "github.com/spiral/roadrunner/v2/interfaces/log" rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc" "github.com/spiral/roadrunner/v2/plugins/config" diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go index 854bf097..347e0330 100644 --- a/plugins/rpc/tests/plugin2.go +++ b/plugins/rpc/tests/plugin2.go @@ -6,7 +6,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" ) // plugin2 makes a call to the plugin1 via RPC diff --git a/plugins/server/tests/socket.php b/plugins/server/tests/socket.php index 143c3ce4..1b76481a 100644 --- a/plugins/server/tests/socket.php +++ b/plugins/server/tests/socket.php @@ -6,7 +6,7 @@ use Spiral\Goridge; use Spiral\RoadRunner; -require dirname(__DIR__) . "/../../vendor_php/autoload.php"; +require dirname(__DIR__) . "/../../tests/vendor/autoload.php"; $relay = new Goridge\SocketRelay( "unix.sock", @@ -16,9 +16,9 @@ $relay = new Goridge\SocketRelay( $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - $rr->send((string)$in); + $rr->send((string)$in->body); } catch (\Throwable $e) { $rr->error((string)$e); } diff --git a/plugins/server/tests/tcp.php b/plugins/server/tests/tcp.php index 2d6fb00a..c567c982 100644 --- a/plugins/server/tests/tcp.php +++ b/plugins/server/tests/tcp.php @@ -6,15 +6,15 @@ use Spiral\Goridge; use Spiral\RoadRunner; -require dirname(__DIR__) . "/../../vendor_php/autoload.php"; +require dirname(__DIR__) . "/../../tests/vendor/autoload.php"; $relay = new Goridge\SocketRelay("localhost", 9999); $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - $rr->send((string)$in); + $rr->send((string)$in->body); } catch (\Throwable $e) { $rr->error((string)$e); } -}
\ No newline at end of file +} diff --git a/protocol.go b/protocol.go index fe1f23a7..ee2d8245 100755 --- a/protocol.go +++ b/protocol.go @@ -5,7 +5,7 @@ import ( j "github.com/json-iterator/go" "github.com/spiral/errors" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" ) var json = j.ConfigCompatibleWithStandardLibrary @@ -19,9 +19,22 @@ type pidCommand struct { } func sendControl(rl goridge.Relay, v interface{}) error { - const op = errors.Op("send control") + const op = errors.Op("send control frame") + frame := goridge.NewFrame() + frame.WriteVersion(goridge.VERSION_1) + frame.WriteFlags(goridge.CONTROL) + if data, ok := v.([]byte); ok { - err := rl.Send(data, goridge.PayloadControl|goridge.PayloadRaw) + // check if payload no more that 4Gb + if uint32(len(data)) > ^uint32(0) { + return errors.E(op, errors.Str("payload is more that 4gb")) + } + + frame.WritePayloadLen(uint32(len(data))) + frame.WritePayload(data) + frame.WriteCRC() + + err := rl.Send(frame) if err != nil { return errors.E(op, err) } @@ -33,7 +46,16 @@ func sendControl(rl goridge.Relay, v interface{}) error { return errors.E(op, errors.Errorf("invalid payload: %s", err)) } - return rl.Send(data, goridge.PayloadControl) + frame.WritePayloadLen(uint32(len(data))) + frame.WritePayload(data) + frame.WriteCRC() + + err = rl.Send(frame) + if err != nil { + return errors.E(op, err) + } + + return nil } func fetchPID(rl goridge.Relay) (int64, error) { @@ -43,16 +65,26 @@ func fetchPID(rl goridge.Relay) (int64, error) { return 0, errors.E(op, err) } - body, p, err := rl.Receive() + frameR := goridge.NewFrame() + err = rl.Receive(frameR) + if !frameR.VerifyCRC() { + return 0, errors.E(op, errors.Str("CRC mismatch")) + } if err != nil { return 0, errors.E(op, err) } - if !p.HasFlag(goridge.PayloadControl) { - return 0, errors.E(op, errors.Str("unexpected response, header is missing")) + if frameR == nil { + return 0, errors.E(op, errors.Str("nil frame received")) + } + + flags := frameR.ReadFlags() + + if flags&(byte(goridge.CONTROL)) == 0 { + return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag")) } link := &pidCommand{} - err = json.Unmarshal(body, link) + err = json.Unmarshal(frameR.Payload(), link) if err != nil { return 0, errors.E(op, err) } diff --git a/protocol_test.go b/protocol_test.go deleted file mode 100755 index 396ce992..00000000 --- a/protocol_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package roadrunner - -import ( - "testing" - - "github.com/pkg/errors" - "github.com/spiral/goridge/v2" - "github.com/stretchr/testify/assert" -) - -type relayMock struct { - error bool - payload string -} - -func (r *relayMock) Send(data []byte, flags byte) (err error) { - if r.error { - return errors.New("send error") - } - - return nil -} - -func (r *relayMock) Receive() (data []byte, p goridge.Prefix, err error) { - return []byte(r.payload), goridge.NewPrefix().WithFlag(goridge.PayloadControl), nil -} - -func (r *relayMock) Close() error { - return nil -} - -func Test_Protocol_Errors(t *testing.T) { - err := sendControl(&relayMock{}, make(chan int)) - assert.Error(t, err) -} - -func Test_Protocol_FetchPID(t *testing.T) { - pid, err := fetchPID(&relayMock{error: false, payload: "{\"pid\":100}"}) - assert.NoError(t, err) - assert.Equal(t, int64(100), pid) - - _, err = fetchPID(&relayMock{error: true, payload: "{\"pid\":100}"}) - assert.Error(t, err) - - _, err = fetchPID(&relayMock{error: false, payload: "{\"pid:100"}) - assert.Error(t, err) -} diff --git a/socket_factory.go b/socket_factory.go index 472e5a05..e517c03f 100755 --- a/socket_factory.go +++ b/socket_factory.go @@ -10,7 +10,7 @@ import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) @@ -215,7 +215,7 @@ func (f *SocketFactory) findRelay(w WorkerBase) (*goridge.SocketRelay, error) { } // chan to store relay associated with specific pid -func (f *SocketFactory) attachRelayToPid(pid int64, relay *goridge.SocketRelay) { +func (f *SocketFactory) attachRelayToPid(pid int64, relay goridge.Relay) { f.relays.Store(pid, relay) } diff --git a/socket_factory_test.go b/socket_factory_test.go index ab6927bd..bbe8cc31 100755 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -254,7 +254,7 @@ func Test_Tcp_Echo(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.Equal(t, "hello", res.String()) } @@ -442,7 +442,7 @@ func Test_Unix_Echo(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.Equal(t, "hello", res.String()) } diff --git a/src/Diactoros/ServerRequestFactory.php b/src/Diactoros/ServerRequestFactory.php deleted file mode 100644 index 6a42f207..00000000 --- a/src/Diactoros/ServerRequestFactory.php +++ /dev/null @@ -1,28 +0,0 @@ -<?php - -/** - * High-performance PHP process supervisor and load balancer written in Go - * - * @author Wolfy-J - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner\Diactoros; - -use Psr\Http\Message\ServerRequestFactoryInterface; -use Psr\Http\Message\ServerRequestInterface; -use Laminas\Diactoros\ServerRequest; - -final class ServerRequestFactory implements ServerRequestFactoryInterface -{ - /** - * @inheritdoc - * - * @param array<mixed> $serverParams Array of SAPI parameters with which to seed the generated request instance. - */ - public function createServerRequest(string $method, $uri, array $serverParams = []): ServerRequestInterface - { - $uploadedFiles = []; - return new ServerRequest($serverParams, $uploadedFiles, $uri, $method); - } -} diff --git a/src/Diactoros/StreamFactory.php b/src/Diactoros/StreamFactory.php deleted file mode 100644 index 68a77e92..00000000 --- a/src/Diactoros/StreamFactory.php +++ /dev/null @@ -1,57 +0,0 @@ -<?php - -/** - * High-performance PHP process supervisor and load balancer written in Go - * - * @author Wolfy-J - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner\Diactoros; - -use RuntimeException; -use Psr\Http\Message\StreamFactoryInterface; -use Psr\Http\Message\StreamInterface; -use Laminas\Diactoros\Stream; - -final class StreamFactory implements StreamFactoryInterface -{ - /** - * @inheritdoc - * @throws RuntimeException - */ - public function createStream(string $content = ''): StreamInterface - { - $resource = fopen('php://temp', 'rb+'); - - if (! \is_resource($resource)) { - throw new RuntimeException('Cannot create stream'); - } - - fwrite($resource, $content); - rewind($resource); - return $this->createStreamFromResource($resource); - } - - /** - * @inheritdoc - */ - public function createStreamFromFile(string $file, string $mode = 'rb'): StreamInterface - { - $resource = fopen($file, $mode); - - if (! \is_resource($resource)) { - throw new RuntimeException('Cannot create stream'); - } - - return $this->createStreamFromResource($resource); - } - - /** - * @inheritdoc - */ - public function createStreamFromResource($resource): StreamInterface - { - return new Stream($resource); - } -} diff --git a/src/Diactoros/UploadedFileFactory.php b/src/Diactoros/UploadedFileFactory.php deleted file mode 100644 index daa475c1..00000000 --- a/src/Diactoros/UploadedFileFactory.php +++ /dev/null @@ -1,36 +0,0 @@ -<?php - -/** - * High-performance PHP process supervisor and load balancer written in Go - * - * @author Wolfy-J - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner\Diactoros; - -use Psr\Http\Message\StreamInterface; -use Psr\Http\Message\UploadedFileFactoryInterface; -use Psr\Http\Message\UploadedFileInterface; -use Laminas\Diactoros\UploadedFile; - -final class UploadedFileFactory implements UploadedFileFactoryInterface -{ - /** - * @inheritdoc - */ - public function createUploadedFile( - StreamInterface $stream, - int $size = null, - int $error = \UPLOAD_ERR_OK, - string $clientFilename = null, - string $clientMediaType = null - ): UploadedFileInterface { - if ($size === null) { - $size = (int) $stream->getSize(); - } - - /** @var resource $stream */ - return new UploadedFile($stream, $size, $error, $clientFilename, $clientMediaType); - } -} diff --git a/src/Exception/MetricException.php b/src/Exception/MetricException.php deleted file mode 100644 index d5b738b8..00000000 --- a/src/Exception/MetricException.php +++ /dev/null @@ -1,17 +0,0 @@ -<?php - -/** - * Spiral Framework. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner\Exception; - -use Spiral\Goridge\Exceptions\RPCException; - -class MetricException extends RPCException -{ -} diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php deleted file mode 100644 index 43967893..00000000 --- a/src/Exceptions/RoadRunnerException.php +++ /dev/null @@ -1,18 +0,0 @@ -<?php - -/** - * Spiral Framework. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner\Exceptions; - -/** - * @deprecated use \Spiral\RoadRunner\Exception\RoadRunnerException instead - */ -class RoadRunnerException extends \RuntimeException -{ -} diff --git a/src/HttpClient.php b/src/HttpClient.php deleted file mode 100644 index 9b9048ca..00000000 --- a/src/HttpClient.php +++ /dev/null @@ -1,74 +0,0 @@ -<?php - -/** - * High-performance PHP process supervisor and load balancer written in Go - * - * @author Alex Bond - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner; - -final class HttpClient -{ - /** @var Worker */ - private $worker; - - /** - * @param Worker $worker - */ - public function __construct(Worker $worker) - { - $this->worker = $worker; - } - - /** - * @return Worker - */ - public function getWorker(): Worker - { - return $this->worker; - } - - /** - * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string] - * or null if termination request or invalid context. - */ - public function acceptRequest(): ?array - { - $body = $this->getWorker()->receive($ctx); - if (empty($body) && empty($ctx)) { - // termination request - return null; - } - - $ctx = json_decode($ctx, true); - if ($ctx === null) { - // invalid context - return null; - } - - return ['ctx' => $ctx, 'body' => $body]; - } - - /** - * Send response to the application server. - * - * @param int $status Http status code - * @param string $body Body of response - * @param string[][] $headers An associative array of the message's headers. Each - * key MUST be a header name, and each value MUST be an array of strings - * for that header. - */ - public function respond(int $status, string $body, array $headers = []): void - { - $sendHeaders = empty($headers) - ? new \stdClass() // this is required to represent empty header set as map and not as array - : $headers; - - $this->getWorker()->send( - $body, - (string) json_encode(['status' => $status, 'headers' => $sendHeaders]) - ); - } -} diff --git a/src/Metrics.php b/src/Metrics.php deleted file mode 100644 index d6b6e1da..00000000 --- a/src/Metrics.php +++ /dev/null @@ -1,80 +0,0 @@ -<?php - -/** - * Spiral Framework. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner; - -use Spiral\Goridge\Exceptions\RPCException; -use Spiral\Goridge\RPC; -use Spiral\RoadRunner\Exception\MetricException; - -/** - * Application metrics. - */ -final class Metrics implements MetricsInterface -{ - /** @var RPC */ - private $rpc; - - /** - * @param RPC $rpc - */ - public function __construct(RPC $rpc) - { - $this->rpc = $rpc; - } - - /** - * @inheritDoc - */ - public function add(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Add', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } - - /** - * @inheritDoc - */ - public function sub(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } - - /** - * @inheritDoc - */ - public function observe(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } - - /** - * @inheritDoc - */ - public function set(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Set', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } -} diff --git a/src/MetricsInterface.php b/src/MetricsInterface.php deleted file mode 100644 index ec2009b0..00000000 --- a/src/MetricsInterface.php +++ /dev/null @@ -1,64 +0,0 @@ -<?php - -/** - * Spiral Framework. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner; - -use Spiral\RoadRunner\Exception\MetricException; - -interface MetricsInterface -{ - /** - * Add collector value. Fallback to appropriate method of related collector. - * - * @param string $collector - * @param float $value - * @param mixed[] $labels - * - * @throws MetricException - * @return void - */ - public function add(string $collector, float $value, array $labels = []); - - /** - * Subtract the collector value, only for gauge collector. - * - * @param string $collector - * @param float $value - * @param mixed[] $labels - * - * @throws MetricException - * @return void - */ - public function sub(string $collector, float $value, array $labels = []); - - /** - * Observe collector value, only for histogram and summary collectors. - * - * @param string $collector - * @param float $value - * @param mixed[] $labels - * - * @throws MetricException - * @return void - */ - public function observe(string $collector, float $value, array $labels = []); - - /** - * Set collector value, only for gauge collector. - * - * @param string $collector - * @param float $value - * @param mixed[] $labels - * - * @throws MetricException - * @return void - */ - public function set(string $collector, float $value, array $labels = []); -} diff --git a/src/Worker.php b/src/Worker.php deleted file mode 100644 index d509562e..00000000 --- a/src/Worker.php +++ /dev/null @@ -1,178 +0,0 @@ -<?php - -/** - * High-performance PHP process supervisor and load balancer written in Go - * - * @author Wolfy-J - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner; - -use Spiral\Goridge\Exceptions\GoridgeException; -use Spiral\Goridge\RelayInterface as Relay; -use Spiral\Goridge\SendPackageRelayInterface; -use Spiral\RoadRunner\Exception\RoadRunnerException; - -/** - * Accepts connection from RoadRunner server over given Goridge relay. - * - * Example: - * - * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT)); - * while ($task = $worker->receive($context)) { - * $worker->send("DONE", json_encode($context)); - * } - */ -class Worker -{ - // Send as response context to request worker termination - public const STOP = '{"stop":true}'; - - /** @var Relay */ - private $relay; - - /** - * @param Relay $relay - */ - public function __construct(Relay $relay) - { - $this->relay = $relay; - } - - /** - * Receive packet of information to process, returns null when process must be stopped. Might - * return Error to wrap error message from server. - * - * @param mixed $header - * @return \Error|null|string - * - * @throws GoridgeException - */ - public function receive(&$header) - { - $body = $this->relay->receiveSync($flags); - - if ($flags & Relay::PAYLOAD_CONTROL) { - if ($this->handleControl($body, $header, $flags)) { - // wait for the next command - return $this->receive($header); - } - - // no context for the termination. - $header = null; - - // Expect process termination - return null; - } - - if ($flags & Relay::PAYLOAD_ERROR) { - return new \Error((string)$body); - } - - return $body; - } - - /** - * Respond to the server with result of task execution and execution context. - * - * Example: - * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders())); - * - * @param string|null $payload - * @param string|null $header - */ - public function send(string $payload = null, string $header = null): void - { - if (!$this->relay instanceof SendPackageRelayInterface) { - if ($header === null) { - $this->relay->send('', Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE); - } else { - $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW); - } - - $this->relay->send((string)$payload, Relay::PAYLOAD_RAW); - } else { - $this->relay->sendPackage( - (string)$header, - Relay::PAYLOAD_CONTROL | ($header === null ? Relay::PAYLOAD_NONE : Relay::PAYLOAD_RAW), - (string)$payload, - Relay::PAYLOAD_RAW - ); - } - } - - /** - * Respond to the server with an error. Error must be treated as TaskError and might not cause - * worker destruction. - * - * Example: - * - * $worker->error("invalid payload"); - * - * @param string $message - */ - public function error(string $message): void - { - $this->relay->send( - $message, - Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR - ); - } - - /** - * Terminate the process. Server must automatically pass task to the next available process. - * Worker will receive StopCommand context after calling this method. - * - * Attention, you MUST use continue; after invoking this method to let rr to properly - * stop worker. - * - * @throws GoridgeException - */ - public function stop(): void - { - $this->send(null, self::STOP); - } - - /** - * Handles incoming control command payload and executes it if required. - * - * @param string $body - * @param mixed $header Exported context (if any). - * @param int $flags - * @return bool True when continue processing. - * - * @throws RoadRunnerException - */ - private function handleControl(string $body = null, &$header = null, int $flags = 0): bool - { - $header = $body; - if ($body === null || $flags & Relay::PAYLOAD_RAW) { - // empty or raw prefix - return true; - } - - $p = json_decode($body, true); - if ($p === false) { - throw new RoadRunnerException('invalid task context, JSON payload is expected'); - } - - // PID negotiation (socket connections only) - if (!empty($p['pid'])) { - $this->relay->send( - sprintf('{"pid":%s}', getmypid()), - Relay::PAYLOAD_CONTROL - ); - } - - // termination request - if (!empty($p['stop'])) { - return false; - } - - // parsed header - $header = $p; - - return true; - } -} diff --git a/static_pool.go b/static_pool.go index b626a499..fbb2e5e8 100755 --- a/static_pool.go +++ b/static_pool.go @@ -162,7 +162,8 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { } // worker want's to be terminated - if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + // TODO careful with string(rsp.Context) + if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { sw.State().Set(StateInvalid) err = sw.Stop(bCtx) if err != nil { diff --git a/static_pool_test.go b/static_pool_test.go index 747f26c4..33799c40 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -82,7 +82,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.Equal(t, "hello", res.String()) } @@ -106,7 +106,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.Equal(t, "hello", res.String()) } @@ -129,7 +129,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, res) - assert.Nil(t, res.Body) + assert.Empty(t, res.Body) assert.NotNil(t, res.Context) assert.Equal(t, "world", string(res.Context)) @@ -214,7 +214,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.Equal(t, "hello", res.String()) assert.Equal(t, runtime.NumCPU(), len(p.Workers())) @@ -293,7 +293,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.NotEqual(t, lastPID, string(res.Body)) lastPID = string(res.Body) @@ -332,7 +332,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.NotEqual(t, lastPID, string(res.Body)) lastPID = string(res.Body) @@ -372,7 +372,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.NotEqual(t, lastPID, string(res.Body)) lastPID = string(res.Body) diff --git a/supervisor_test.go b/supervisor_test.go index 08ea356d..d5d7d04c 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -142,7 +142,8 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { }) assert.NoError(t, err) - assert.Empty(t, resp) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) time.Sleep(time.Second * 1) // should be the same pid diff --git a/sync_worker.go b/sync_worker.go index 7e4d21cc..94a804a7 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -1,6 +1,7 @@ package roadrunner import ( + "bytes" "context" "time" @@ -8,7 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/util" "go.uber.org/multierr" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" ) var EmptyPayload = Payload{} @@ -134,37 +135,60 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, func (tw *syncWorker) execPayload(p Payload) (Payload, error) { const op = errors.Op("exec payload") - // two things; todo: merge - if err := sendControl(tw.w.Relay(), p.Context); err != nil { - return EmptyPayload, errors.E(op, err, "header error") - } - if err := tw.w.Relay().Send(p.Body, 0); err != nil { - return EmptyPayload, errors.E(op, err, "sender error") + frame := goridge.NewFrame() + frame.WriteVersion(goridge.VERSION_1) + // can be 0 here + + buf := new(bytes.Buffer) + buf.Write(p.Context) + buf.Write(p.Body) + + // Context offset + frame.WriteOptions(uint32(len(p.Context))) + frame.WritePayloadLen(uint32(buf.Len())) + frame.WritePayload(buf.Bytes()) + + frame.WriteCRC() + + // empty and free the buffer + buf.Truncate(0) + + err := tw.Relay().Send(frame) + if err != nil { + return EmptyPayload, err } - var pr goridge.Prefix - rsp := Payload{} + frameR := goridge.NewFrame() - var err error - if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil { - return EmptyPayload, errors.E(op, err, "WorkerProcess error") + err = tw.w.Relay().Receive(frameR) + if err != nil { + return EmptyPayload, errors.E(op, err) + } + if frameR == nil { + return EmptyPayload, errors.E(op, errors.Str("nil frame received")) } - if !pr.HasFlag(goridge.PayloadControl) { - return EmptyPayload, errors.E(op, errors.Str("malformed WorkerProcess response")) + if !frameR.VerifyCRC() { + return EmptyPayload, errors.E(op, errors.Str("failed to verify CRC")) } - if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(rsp.Context))) + flags := frameR.ReadFlags() + + if flags&byte(goridge.ERROR) != byte(0) { + return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) } - // add streaming support :) - if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil { - return EmptyPayload, errors.E(op, err, "WorkerProcess error") + options := frameR.ReadOptions() + if len(options) != 1 { + return EmptyPayload, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) } - return rsp, nil + payload := Payload{} + payload.Context = frameR.Payload()[:options[0]] + payload.Body = frameR.Payload()[options[0]:] + + return payload, nil } func (tw *syncWorker) String() string { diff --git a/sync_worker_test.go b/sync_worker_test.go index 30b5bae8..0ef1e0cd 100755 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -40,7 +40,7 @@ func Test_Echo(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.Equal(t, "hello", res.String()) } @@ -148,7 +148,7 @@ func Test_Echo_Slow(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + assert.Empty(t, res.Context) assert.Equal(t, "hello", res.String()) } diff --git a/tests/broken.php b/tests/broken.php index 42b4e7c2..1f869b2d 100644 --- a/tests/broken.php +++ b/tests/broken.php @@ -8,7 +8,7 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { echo undefined_function(); - $rr->send((string)$in); + $rr->send((string)$in->body, null); } diff --git a/tests/client.php b/tests/client.php index 835b1c6c..c00cece1 100644 --- a/tests/client.php +++ b/tests/client.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/composer.json b/tests/composer.json new file mode 100644 index 00000000..24702c37 --- /dev/null +++ b/tests/composer.json @@ -0,0 +1,12 @@ +{ + "minimum-stability": "beta", + "require": { + "nyholm/psr7": "^1.3", + "spiral/goridge": "^3.0@beta" + }, + "autoload": { + "psr-4": { + "Spiral\\RoadRunner\\": "src/" + } + } +} diff --git a/tests/delay.php b/tests/delay.php index bf9ecc12..f0435b05 100644 --- a/tests/delay.php +++ b/tests/delay.php @@ -8,9 +8,9 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - usleep($in * 1000); + usleep($in->body * 1000); $rr->send(''); } catch (\Throwable $e) { $rr->error((string)$e); diff --git a/tests/echo.php b/tests/echo.php index 1570e3df..83eec92e 100644 --- a/tests/echo.php +++ b/tests/echo.php @@ -8,9 +8,9 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - $rr->send((string)$in); + $rr->send((string)$in->body); } catch (\Throwable $e) { $rr->error((string)$e); } diff --git a/tests/error.php b/tests/error.php index 8e1c8d0d..c77e6817 100644 --- a/tests/error.php +++ b/tests/error.php @@ -8,6 +8,6 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { - $rr->error((string)$in); +while ($in = $rr->waitPayload()) { + $rr->error((string)$in->body); } diff --git a/tests/head.php b/tests/head.php index 88ebd3f2..3c57258f 100644 --- a/tests/head.php +++ b/tests/head.php @@ -8,9 +8,9 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - $rr->send("", (string)$ctx); + $rr->send("", (string)$in->header); } catch (\Throwable $e) { $rr->error((string)$e); } diff --git a/tests/http/client.php b/tests/http/client.php index 9f21b273..ad5cce24 100644 --- a/tests/http/client.php +++ b/tests/http/client.php @@ -4,7 +4,7 @@ use Spiral\Goridge; use Spiral\RoadRunner; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/../vendor_php/autoload.php"; +require dirname(__DIR__) . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); @@ -33,12 +33,18 @@ switch ($goridge) { die("invalid protocol selection"); } -$psr7 = new RoadRunner\PSR7Client(new RoadRunner\Worker($relay)); +$psr7 = new RoadRunner\Http\PSR7Worker( + new RoadRunner\Worker($relay), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() +); + require_once sprintf("%s/%s.php", __DIR__, $test); -while ($req = $psr7->acceptRequest()) { +while ($req = $psr7->waitRequest()) { try { - $psr7->respond(handleRequest($req, new \Zend\Diactoros\Response())); + $psr7->respond(handleRequest($req, new \Nyholm\Psr7\Response())); } catch (\Throwable $e) { $psr7->getWorker()->error((string)$e); } diff --git a/tests/http/slow-client.php b/tests/http/slow-client.php index 4d3963d7..731232f7 100644 --- a/tests/http/slow-client.php +++ b/tests/http/slow-client.php @@ -4,13 +4,13 @@ use Spiral\Goridge; use Spiral\RoadRunner; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/../vendor_php/autoload.php"; +require dirname(__DIR__) . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); } -list($test, $goridge, $bootDelay) = [$argv[1], $argv[2], $argv[3]]; +[$test, $goridge, $bootDelay] = [$argv[1], $argv[2], $argv[3]]; usleep($bootDelay * 1000); switch ($goridge) { @@ -34,13 +34,19 @@ switch ($goridge) { die("invalid protocol selection"); } -$psr7 = new RoadRunner\PSR7Client(new RoadRunner\Worker($relay)); +$psr7 = new RoadRunner\Http\PSR7Worker( + new RoadRunner\Worker($relay), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() +); + require_once sprintf("%s/%s.php", __DIR__, $test); -while ($req = $psr7->acceptRequest()) { +while ($req = $psr7->waitRequest()) { try { - $psr7->respond(handleRequest($req, new \Zend\Diactoros\Response())); + $psr7->respond(handleRequest($req, new \Nyholm\Psr7\Response())); } catch (\Throwable $e) { - $psr7->getWorker()->error((string)$e); + $psr7->getWorker()->error((string) $e); } } diff --git a/tests/memleak.php b/tests/memleak.php index b78a76c0..9a5376f0 100644 --- a/tests/memleak.php +++ b/tests/memleak.php @@ -5,11 +5,11 @@ declare(strict_types=1); use Spiral\Goridge\StreamRelay; use Spiral\RoadRunner\Worker as RoadRunner; -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; $rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); $mem = ''; -while($rr->receive($ctx)){ +while($rr->waitPayload()){ $mem .= str_repeat(" ", 1024*1024); $rr->send(""); -}
\ No newline at end of file +} diff --git a/tests/pid.php b/tests/pid.php index bf10a025..f8b2515d 100644 --- a/tests/pid.php +++ b/tests/pid.php @@ -8,7 +8,7 @@ $rr = new RoadRunner\Worker($relay); - while ($in = $rr->receive($ctx)) { + while ($in = $rr->waitPayload()) { try { $rr->send((string)getmypid()); } catch (\Throwable $e) { diff --git a/plugins/gzip/tests/psr-worker.php b/tests/psr-worker.php index ed936bde..db53eee2 100644 --- a/plugins/gzip/tests/psr-worker.php +++ b/tests/psr-worker.php @@ -6,18 +6,23 @@ use Spiral\Goridge; use Spiral\RoadRunner; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/../../vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; $worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); -$psr7 = new RoadRunner\PSR7Client($worker); +$psr7 = new RoadRunner\Http\PSR7Worker( + $worker, + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() +); -while ($req = $psr7->acceptRequest()) { +while ($req = $psr7->waitRequest()) { try { - $resp = new \Zend\Diactoros\Response(); + $resp = new \Nyholm\Psr7\Response(); $resp->getBody()->write(str_repeat("hello world", 1000)); $psr7->respond($resp); } catch (\Throwable $e) { $psr7->getWorker()->error((string)$e); } -}
\ No newline at end of file +} diff --git a/tests/sleep.php b/tests/sleep.php index b3ea8235..e34a6834 100644 --- a/tests/sleep.php +++ b/tests/sleep.php @@ -5,11 +5,11 @@ declare(strict_types=1); use Spiral\Goridge\StreamRelay; use Spiral\RoadRunner\Worker as RoadRunner; -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; $rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); -while($rr->receive($ctx)){ +while($rr->waitPayload()){ sleep(3); $rr->send(""); -}
\ No newline at end of file +} diff --git a/tests/slow-client.php b/tests/slow-client.php index ece0a439..7737f0b1 100644 --- a/tests/slow-client.php +++ b/tests/slow-client.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/slow-destroy.php b/tests/slow-destroy.php index e2a01af2..900bb68a 100644 --- a/tests/slow-destroy.php +++ b/tests/slow-destroy.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/slow-pid.php b/tests/slow-pid.php index 747e7e86..3660cb40 100644 --- a/tests/slow-pid.php +++ b/tests/slow-pid.php @@ -8,7 +8,7 @@ $rr = new RoadRunner\Worker($relay); - while ($in = $rr->receive($ctx)) { + while ($in = $rr->waitPayload()) { try { sleep(1); $rr->send((string)getmypid()); diff --git a/tests/src/Environment.php b/tests/src/Environment.php new file mode 100644 index 00000000..9b306063 --- /dev/null +++ b/tests/src/Environment.php @@ -0,0 +1,82 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\RoadRunner\Exception\EnvironmentException; + +class Environment implements EnvironmentInterface +{ + /** @var array */ + private array $env; + + /** + * @param array $env + */ + public function __construct(array $env) + { + $this->env = $env; + } + + /** + * Returns worker mode assigned to the PHP process. + * + * @return string + * @throws EnvironmentException + */ + public function getMode(): string + { + return $this->getValue('RR_MODE'); + } + + /** + * Address worker should be connected to (or pipes). + * + * @return string + * @throws EnvironmentException + */ + public function getRelayAddress(): string + { + return $this->getValue('RR_RELAY'); + } + + /** + * RPC address. + * + * @return string + * @throws EnvironmentException + */ + public function getRPCAddress(): string + { + return $this->getValue('RR_RPC'); + } + + /** + * @param string $name + * @return string + * @throws EnvironmentException + */ + private function getValue(string $name): string + { + if (!isset($this->env[$name])) { + throw new EnvironmentException(sprintf("Missing environment value `%s`", $name)); + } + + return (string) $this->env[$name]; + } + + /** + * @return EnvironmentInterface + */ + public static function fromGlobals(): EnvironmentInterface + { + return new static(array_merge($_SERVER, $_ENV)); + } +} diff --git a/tests/src/EnvironmentInterface.php b/tests/src/EnvironmentInterface.php new file mode 100644 index 00000000..bc0ae043 --- /dev/null +++ b/tests/src/EnvironmentInterface.php @@ -0,0 +1,43 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\RoadRunner\Exception\EnvironmentException; + +/** + * Provides base values to configure roadrunner worker. + */ +interface EnvironmentInterface +{ + /** + * Returns worker mode assigned to the PHP process. + * + * @return string + * @throws EnvironmentException + */ + public function getMode(): string; + + /** + * Address worker should be connected to (or pipes). + * + * @return string + * @throws EnvironmentException + */ + public function getRelayAddress(): string; + + /** + * RPC address. + * + * @return string + * @throws EnvironmentException + */ + public function getRPCAddress(): string; +} diff --git a/tests/src/Exception/EnvironmentException.php b/tests/src/Exception/EnvironmentException.php new file mode 100644 index 00000000..227507c5 --- /dev/null +++ b/tests/src/Exception/EnvironmentException.php @@ -0,0 +1,16 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner\Exception; + +class EnvironmentException extends RoadRunnerException +{ + +} diff --git a/src/Exception/RoadRunnerException.php b/tests/src/Exception/RoadRunnerException.php index f83c3dd4..2329370c 100644 --- a/src/Exception/RoadRunnerException.php +++ b/tests/src/Exception/RoadRunnerException.php @@ -1,14 +1,15 @@ <?php /** - * High-performance PHP process supervisor and load balancer written in Go + * High-performance PHP process supervisor and load balancer written in Go. * - * @author Wolfy-J + * @author Wolfy-J */ + declare(strict_types=1); namespace Spiral\RoadRunner\Exception; -class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException +class RoadRunnerException extends \RuntimeException { } diff --git a/tests/src/Http/HttpWorker.php b/tests/src/Http/HttpWorker.php new file mode 100644 index 00000000..ba045d87 --- /dev/null +++ b/tests/src/Http/HttpWorker.php @@ -0,0 +1,105 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Alex Bond + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner\Http; + +use Spiral\RoadRunner\WorkerInterface; + +class HttpWorker +{ + /** @var WorkerInterface */ + private WorkerInterface $worker; + + /** + * @param WorkerInterface $worker + */ + public function __construct(WorkerInterface $worker) + { + $this->worker = $worker; + } + + /** + * @return WorkerInterface + */ + public function getWorker(): WorkerInterface + { + return $this->worker; + } + + /** + * Wait for incoming http request. + * + * @return Request|null + */ + public function waitRequest(): ?Request + { + $payload = $this->getWorker()->waitPayload(); + if (empty($payload->body) && empty($payload->header)) { + // termination request + return null; + } + + $request = new Request(); + $request->body = $payload->body; + + $context = json_decode($payload->header, true); + if ($context === null) { + // invalid context + return null; + } + + $this->hydrateRequest($request, $context); + + return $request; + } + + /** + * Send response to the application server. + * + * @param int $status Http status code + * @param string $body Body of response + * @param string[][] $headers An associative array of the message's headers. Each + * key MUST be a header name, and each value MUST be an array of strings + * for that header. + */ + public function respond(int $status, string $body, array $headers = []): void + { + if ($headers === []) { + // this is required to represent empty header set as map and not as array + $headers = new \stdClass(); + } + + $this->getWorker()->send( + $body, + (string) json_encode(['status' => $status, 'headers' => $headers]) + ); + } + + /** + * @param Request $request + * @param array $context + */ + private function hydrateRequest(Request $request, array $context): void + { + $request->remoteAddr = $context['remoteAddr']; + $request->protocol = $context['protocol']; + $request->method = $context['method']; + $request->uri = $context['uri']; + $request->attributes = $context['attributes'] ?? []; + $request->headers = $context['headers']; + $request->cookies = $context['cookies'] ?? []; + $request->uploads = $context['uploads'] ?? []; + + $request->query = []; + parse_str($context['rawQuery'], $request->query); + + // indicates that body was parsed + $request->parsed = $context['parsed']; + } +} diff --git a/src/PSR7Client.php b/tests/src/Http/PSR7Worker.php index 777dd891..f16c4847 100644 --- a/src/PSR7Client.php +++ b/tests/src/Http/PSR7Worker.php @@ -7,7 +7,7 @@ */ declare(strict_types=1); -namespace Spiral\RoadRunner; +namespace Spiral\RoadRunner\Http; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestFactoryInterface; @@ -15,100 +15,64 @@ use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\UploadedFileFactoryInterface; use Psr\Http\Message\UploadedFileInterface; +use Spiral\RoadRunner\WorkerInterface; /** * Manages PSR-7 request and response. */ -class PSR7Client +class PSR7Worker { - /** @var HttpClient */ - private $httpClient; - - /** @var ServerRequestFactoryInterface */ - private $requestFactory; - - /** @var StreamFactoryInterface */ - private $streamFactory; - - /** @var UploadedFileFactoryInterface */ - private $uploadsFactory; + private HttpWorker $httpWorker; + private ServerRequestFactoryInterface $requestFactory; + private StreamFactoryInterface $streamFactory; + private UploadedFileFactoryInterface $uploadsFactory; /** @var mixed[] */ - private $originalServer = []; + private array $originalServer = []; /** @var string[] Valid values for HTTP protocol version */ - private static $allowedVersions = ['1.0', '1.1', '2',]; + private static array $allowedVersions = ['1.0', '1.1', '2',]; /** - * @param Worker $worker - * @param ServerRequestFactoryInterface|null $requestFactory - * @param StreamFactoryInterface|null $streamFactory - * @param UploadedFileFactoryInterface|null $uploadsFactory + * @param WorkerInterface $worker + * @param ServerRequestFactoryInterface $requestFactory + * @param StreamFactoryInterface $streamFactory + * @param UploadedFileFactoryInterface $uploadsFactory */ public function __construct( - Worker $worker, - ServerRequestFactoryInterface $requestFactory = null, - StreamFactoryInterface $streamFactory = null, - UploadedFileFactoryInterface $uploadsFactory = null + WorkerInterface $worker, + ServerRequestFactoryInterface $requestFactory, + StreamFactoryInterface $streamFactory, + UploadedFileFactoryInterface $uploadsFactory ) { - $this->httpClient = new HttpClient($worker); - $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory(); - $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory(); - $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory(); + $this->httpWorker = new HttpWorker($worker); + $this->requestFactory = $requestFactory; + $this->streamFactory = $streamFactory; + $this->uploadsFactory = $uploadsFactory; $this->originalServer = $_SERVER; } /** - * @return Worker + * @return WorkerInterface */ - public function getWorker(): Worker + public function getWorker(): WorkerInterface { - return $this->httpClient->getWorker(); + return $this->httpWorker->getWorker(); } /** * @return ServerRequestInterface|null */ - public function acceptRequest(): ?ServerRequestInterface + public function waitRequest(): ?ServerRequestInterface { - $rawRequest = $this->httpClient->acceptRequest(); - if ($rawRequest === null) { + $httpRequest = $this->httpWorker->waitRequest(); + if ($httpRequest === null) { return null; } - $_SERVER = $this->configureServer($rawRequest['ctx']); - - $request = $this->requestFactory->createServerRequest( - $rawRequest['ctx']['method'], - $rawRequest['ctx']['uri'], - $_SERVER - ); - - parse_str($rawRequest['ctx']['rawQuery'], $query); - - $request = $request - ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol'])) - ->withCookieParams($rawRequest['ctx']['cookies']) - ->withQueryParams($query) - ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads'])); - - foreach ($rawRequest['ctx']['attributes'] as $name => $value) { - $request = $request->withAttribute($name, $value); - } - - foreach ($rawRequest['ctx']['headers'] as $name => $value) { - $request = $request->withHeader($name, $value); - } - - if ($rawRequest['ctx']['parsed']) { - return $request->withParsedBody(json_decode($rawRequest['body'], true)); - } - - if ($rawRequest['body'] !== null) { - return $request->withBody($this->streamFactory->createStream($rawRequest['body'])); - } + $_SERVER = $this->configureServer($httpRequest); - return $request; + return $this->mapRequest($httpRequest, $_SERVER); } /** @@ -118,7 +82,7 @@ class PSR7Client */ public function respond(ResponseInterface $response): void { - $this->httpClient->respond( + $this->httpWorker->respond( $response->getStatusCode(), $response->getBody()->__toString(), $response->getHeaders() @@ -129,21 +93,21 @@ class PSR7Client * Returns altered copy of _SERVER variable. Sets ip-address, * request-time and other values. * - * @param mixed[] $ctx + * @param Request $request * @return mixed[] */ - protected function configureServer(array $ctx): array + protected function configureServer(Request $request): array { $server = $this->originalServer; - $server['REQUEST_URI'] = $ctx['uri']; + $server['REQUEST_URI'] = $request->uri; $server['REQUEST_TIME'] = time(); $server['REQUEST_TIME_FLOAT'] = microtime(true); - $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1'; - $server['REQUEST_METHOD'] = $ctx['method']; + $server['REMOTE_ADDR'] = $request->getRemoteAddr(); + $server['REQUEST_METHOD'] = $request->method; $server['HTTP_USER_AGENT'] = ''; - foreach ($ctx['headers'] as $key => $value) { + foreach ($request->headers as $key => $value) { $key = strtoupper(str_replace('-', '_', $key)); if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) { $server[$key] = implode(', ', $value); @@ -156,18 +120,52 @@ class PSR7Client } /** + * @param Request $httpRequest + * @param array $server + * @return ServerRequestInterface + */ + protected function mapRequest(Request $httpRequest, array $server): ServerRequestInterface + { + $request = $this->requestFactory->createServerRequest( + $httpRequest->method, + $httpRequest->uri, + $_SERVER + ); + + + $request = $request + ->withProtocolVersion(static::fetchProtocolVersion($httpRequest->protocol)) + ->withCookieParams($httpRequest->cookies) + ->withQueryParams($httpRequest->query) + ->withUploadedFiles($this->wrapUploads($httpRequest->uploads)); + + foreach ($httpRequest->attributes as $name => $value) { + $request = $request->withAttribute($name, $value); + } + + foreach ($httpRequest->headers as $name => $value) { + $request = $request->withHeader($name, $value); + } + + if ($httpRequest->parsed) { + return $request->withParsedBody($httpRequest->getParsedBody()); + } + + if ($httpRequest->body !== null) { + return $request->withBody($this->streamFactory->createStream($httpRequest->body)); + } + + return $request; + } + + /** * Wraps all uploaded files with UploadedFile. * * @param array[] $files - * * @return UploadedFileInterface[]|mixed[] */ - private function wrapUploads($files): array + protected function wrapUploads(array $files): array { - if (empty($files)) { - return []; - } - $result = []; foreach ($files as $index => $f) { if (!isset($f['name'])) { diff --git a/tests/src/Http/Request.php b/tests/src/Http/Request.php new file mode 100644 index 00000000..ef67e28d --- /dev/null +++ b/tests/src/Http/Request.php @@ -0,0 +1,48 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner\Http; + +final class Request +{ + + public string $remoteAddr; + public string $protocol; + public string $method; + public string $uri; + public array $headers; + public array $cookies; + public array $uploads; + public array $attributes; + public array $query; + public ?string $body; + public bool $parsed; + + /** + * @return string + */ + public function getRemoteAddr(): string + { + return $this->attributes['ipAddress'] ?? $this->remoteAddr ?? '127.0.0.1'; + } + + /** + * @return array|null + */ + public function getParsedBody(): ?array + { + if ($this->parsed) { + return json_decode($this->body, true); + } + + return null; + } +} diff --git a/tests/src/Payload.php b/tests/src/Payload.php new file mode 100644 index 00000000..c9b8c198 --- /dev/null +++ b/tests/src/Payload.php @@ -0,0 +1,43 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +/** + * Class Payload + * + * @package Spiral\RoadRunner + */ +final class Payload +{ + /** + * Execution payload (binary). + * + * @var string|null + */ + public ?string $body; + + /** + * Execution context (binary). + * + * @var string|null + */ + public ?string $header; + + /** + * @param string|null $body + * @param string|null $header + */ + public function __construct(?string $body, ?string $header = null) + { + $this->body = $body; + $this->header = $header; + } +} diff --git a/tests/src/Worker.php b/tests/src/Worker.php new file mode 100644 index 00000000..53cf6cef --- /dev/null +++ b/tests/src/Worker.php @@ -0,0 +1,162 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exception\GoridgeException; +use Spiral\Goridge\Frame; +use Spiral\Goridge\RelayInterface as Relay; +use Spiral\RoadRunner\Exception\EnvironmentException; +use Spiral\RoadRunner\Exception\RoadRunnerException; + +/** + * Accepts connection from RoadRunner server over given Goridge relay. + * + * $worker = Worker::create(); + * while ($p = $worker->waitPayload()) { + * $worker->send(new Payload("DONE", json_encode($context))); + * } + */ +class Worker implements WorkerInterface +{ + // Request graceful worker termination. + private const STOP_REQUEST = '{"stop":true}'; + + private Relay $relay; + + /** + * @param Relay $relay + */ + public function __construct(Relay $relay) + { + $this->relay = $relay; + } + + /** + * Wait for incoming payload from the server. Must return null when worker stopped. + * + * @return Payload|null + * @throws GoridgeException + * @throws RoadRunnerException + */ + public function waitPayload(): ?Payload + { + $frame = $this->relay->waitFrame(); + + if ($frame->hasFlag(Frame::CONTROL)) { + $continue = $this->handleControl($frame->payload); + + if ($continue) { + return $this->waitPayload(); + } else { + return null; + } + } + + return new Payload( + substr($frame->payload, $frame->options[0]), + substr($frame->payload, 0, $frame->options[0]) + ); + } + + /** + * Respond to the server with the processing result. + * + * @param Payload $payload + * @throws GoridgeException + */ + public function respond(Payload $payload): void + { + $this->send($payload->body, $payload->header); + } + + /** + * Respond to the server with an error. Error must be treated as TaskError and might not cause + * worker destruction. + * + * Example: + * + * $worker->error("invalid payload"); + * + * @param string $message + */ + public function error(string $message): void + { + $this->relay->send(new Frame($message, [], Frame::ERROR)); + } + + /** + * Terminate the process. Server must automatically pass task to the next available process. + * Worker will receive StopCommand context after calling this method. + * + * Attention, you MUST use continue; after invoking this method to let rr to properly + * stop worker. + * + * @throws GoridgeException + */ + public function stop(): void + { + $this->send("", self::STOP_REQUEST); + } + + /** + * @param string $body + * @param string|null $context + * @throws GoridgeException + */ + public function send(string $body, string $context = null): void + { + $this->relay->send(new Frame( + (string) $context . $body, + [strlen((string) $context)] + )); + } + + /** + * Return true if continue. + * + * @param string $header + * @return bool + * + * @throws RoadRunnerException + */ + private function handleControl(string $header): bool + { + $command = json_decode($header, true); + if ($command === false) { + throw new RoadRunnerException('Invalid task header, JSON payload is expected'); + } + + switch (true) { + case !empty($command['pid']): + $this->relay->send(new Frame(sprintf('{"pid":%s}', getmypid()), [], Frame::CONTROL)); + return true; + + case !empty($command['stop']): + return false; + + default: + throw new RoadRunnerException('Invalid task header, undefined control package'); + } + } + + /** + * Create Worker using global environment configuration. + * + * @return WorkerInterface + * @throws EnvironmentException + */ + public static function create(): WorkerInterface + { + $env = Environment::fromGlobals(); + + return new static(\Spiral\Goridge\Relay::create($env->getRelayAddress())); + } +} diff --git a/tests/src/WorkerInterface.php b/tests/src/WorkerInterface.php new file mode 100644 index 00000000..bf0b6e06 --- /dev/null +++ b/tests/src/WorkerInterface.php @@ -0,0 +1,55 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exception\GoridgeException; +use Spiral\RoadRunner\Exception\RoadRunnerException; + +interface WorkerInterface +{ + /** + * Wait for incoming payload from the server. Must return null when worker stopped. + * + * @return Payload|null + * @throws GoridgeException + * @throws RoadRunnerException + */ + public function waitPayload(): ?Payload; + + /** + * Respond to the server with the processing result. + * + * @param Payload $payload + * @throws GoridgeException + */ + public function respond(Payload $payload): void; + + /** + * Respond to the server with an error. Error must be treated as TaskError and might not cause + * worker destruction. + * + * Example: + * + * $worker->error("invalid payload"); + * + * @param string $error + * @throws GoridgeException + */ + public function error(string $error): void; + + /** + * Terminate the process. Server must automatically pass task to the next available process. + * Worker will receive stop command after calling this method. + * + * Attention, you MUST use continue; after invoking this method to let rr to properly stop worker. + */ + public function stop(): void; +} diff --git a/tests/stop.php b/tests/stop.php index 0100ad0f..f83d3f29 100644 --- a/tests/stop.php +++ b/tests/stop.php @@ -9,7 +9,7 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); $used = false; -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { if ($used) { // kill on second attempt @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "os/exec" "strconv" @@ -14,7 +15,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/util" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" "go.uber.org/multierr" ) @@ -57,6 +58,13 @@ type WorkerEvent struct { Payload interface{} } +var pool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 10240) + return &buf + }, +} + type WorkerBase interface { fmt.Stringer @@ -130,10 +138,12 @@ type WorkerProcess struct { endState *os.ProcessState // ensures than only one execution can be run at once. - mu sync.Mutex + mu sync.RWMutex // communication bus with underlying process. relay goridge.Relay + rd io.Reader + stop chan struct{} } // InitBaseWorker creates new WorkerProcess over given exec.cmd. @@ -147,13 +157,19 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { cmd: cmd, state: newState(StateInactive), stderr: new(bytes.Buffer), + stop: make(chan struct{}, 1), } + w.rd, w.cmd.Stderr = io.Pipe() + // small buffer optimization // at this point we know, that stderr will contain huge messages - w.stderr.Grow(1024) + w.stderr.Grow(10240) + + go func() { + w.watch() + }() - w.cmd.Stderr = w return w, nil } @@ -222,6 +238,7 @@ func (w *WorkerProcess) Start() error { func (w *WorkerProcess) Wait() error { const op = errors.Op("worker process wait") err := multierr.Combine(w.cmd.Wait()) + // at this point according to the documentation (see cmd.Wait comment) // if worker finishes with an error, message will be written to the stderr first // and then w.cmd.Wait return an error @@ -229,10 +246,14 @@ func (w *WorkerProcess) Wait() error { if err != nil { w.state.Set(StateErrored) + w.mu.RLock() // if process return code > 0, here will be an error from stderr (if presents) if w.stderr.Len() > 0 { err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) + // stop the stderr buffer + w.stop <- struct{}{} } + w.mu.RUnlock() return multierr.Append(err, w.closeRelay()) } @@ -299,16 +320,48 @@ func (w *WorkerProcess) Kill() error { return nil } +// put the pointer, to not allocate new slice +// but erase it len and then return back +func (w *WorkerProcess) put(data *[]byte) { + *data = (*data)[:0] + *data = (*data)[:cap(*data)] + + pool.Put(data) +} + +// get pointer to the byte slice +func (w *WorkerProcess) get() *[]byte { + return pool.Get().(*[]byte) +} + // Write appends the contents of pool to the errBuffer, growing the errBuffer as // needed. The return value n is the length of pool; errBuffer is always nil. -func (w *WorkerProcess) Write(p []byte) (int, error) { - w.mu.Lock() - defer w.mu.Unlock() - // clean all previous messages in the stderr - w.stderr.Truncate(0) - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p}) - // write new message - w.stderr.Write(p) - - return len(p), nil +func (w *WorkerProcess) watch() { + go func() { + for { + select { + case <-w.stop: + buf := w.get() + // read the last data + n, _ := w.rd.Read(*buf) + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write((*buf)[:n]) + w.mu.Unlock() + w.put(buf) + return + default: + // read the max 10kb of stderr per one read + buf := w.get() + n, _ := w.rd.Read(*buf) + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write((*buf)[:n]) + w.mu.Unlock() + w.put(buf) + } + } + }() } diff --git a/worker_watcher.go b/worker_watcher.go index 8bc147d0..f8fb67a9 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -11,40 +11,46 @@ import ( ) type Stack struct { - workers []WorkerBase - mutex sync.RWMutex - destroy bool + workers []WorkerBase + mutex sync.RWMutex + destroy bool + actualNumOfWorkers int64 } func NewWorkersStack() *Stack { + w := runtime.NumCPU() return &Stack{ - workers: make([]WorkerBase, 0, runtime.NumCPU()), + workers: make([]WorkerBase, 0, w), + actualNumOfWorkers: 0, } } func (stack *Stack) Reset() { stack.mutex.Lock() defer stack.mutex.Unlock() - + stack.actualNumOfWorkers = 0 stack.workers = nil } +// Push worker back to the stack +// If stack in destroy state, Push will provide 100ms window to unlock the mutex func (stack *Stack) Push(w WorkerBase) { stack.mutex.Lock() defer stack.mutex.Unlock() + stack.actualNumOfWorkers++ stack.workers = append(stack.workers, w) } func (stack *Stack) IsEmpty() bool { stack.mutex.Lock() defer stack.mutex.Unlock() - return len(stack.workers) == 0 } func (stack *Stack) Pop() (WorkerBase, bool) { stack.mutex.Lock() defer stack.mutex.Unlock() + // do not release new stack if stack.destroy { return nil, true @@ -54,12 +60,83 @@ func (stack *Stack) Pop() (WorkerBase, bool) { return nil, false } + // move worker w := stack.workers[len(stack.workers)-1] stack.workers = stack.workers[:len(stack.workers)-1] - + stack.actualNumOfWorkers-- return w, false } +func (stack *Stack) FindAndRemoveByPid(pid int64) bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + for i := 0; i < len(stack.workers); i++ { + // worker in the stack, reallocating + if stack.workers[i].Pid() == pid { + stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) + stack.actualNumOfWorkers-- + // worker found and removed + return true + } + } + // no worker with such ID + return false +} + +// Workers return copy of the workers in the stack +func (stack *Stack) Workers() []WorkerBase { + stack.mutex.Lock() + defer stack.mutex.Unlock() + workersCopy := make([]WorkerBase, 0, 1) + // copy + for _, v := range stack.workers { + sw := v.(SyncWorker) + workersCopy = append(workersCopy, sw) + } + + return workersCopy +} + +func (stack *Stack) isDestroying() bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + return stack.destroy +} + +// we also have to give a chance to pool to Push worker (return it) +func (stack *Stack) Destroy(ctx context.Context) { + stack.mutex.Lock() + stack.destroy = true + stack.mutex.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-tt.C: + stack.mutex.Lock() + // that might be one of the workers is working + if len(stack.workers) != int(stack.actualNumOfWorkers) { + stack.mutex.Unlock() + continue + } + stack.mutex.Unlock() + // unnecessary mutex, but + // just to make sure. All stack at this moment are in the stack + // Pop operation is blocked, push can't be done, since it's not possible to pop + stack.mutex.Lock() + for i := 0; i < len(stack.workers); i++ { + // set state for the stack in the stack (unused at the moment) + stack.workers[i].State().Set(StateDestroyed) + } + stack.mutex.Unlock() + tt.Stop() + // clear + stack.Reset() + return + } + } +} + type WorkerWatcher interface { // AddToWatch used to add stack to wait its state AddToWatch(workers []WorkerBase) error @@ -151,7 +228,6 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) if w == nil { continue } - ww.ReduceWorkersCount() return w, nil case <-ctx.Done(): return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) @@ -159,7 +235,6 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) } } - ww.ReduceWorkersCount() return w, nil } @@ -184,91 +259,41 @@ func (ww *workerWatcher) AllocateNew() error { } func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error { - ww.stack.mutex.Lock() + ww.mutex.Lock() + defer ww.mutex.Unlock() + const op = errors.Op("remove worker") - defer ww.stack.mutex.Unlock() pid := wb.Pid() - for i := 0; i < len(ww.stack.workers); i++ { - if ww.stack.workers[i].Pid() == pid { - // found in the stack - // remove worker - ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.ReduceWorkersCount() - - wb.State().Set(StateInvalid) - err := wb.Kill() - if err != nil { - return errors.E(op, err) - } - break + + if ww.stack.FindAndRemoveByPid(pid) { + wb.State().Set(StateInvalid) + err := wb.Kill() + if err != nil { + return errors.E(op, err) } + return nil } - // worker currently handle request, set state Remove + wb.State().Set(StateRemove) return nil } // O(1) operation func (ww *workerWatcher) PushWorker(w WorkerBase) { - ww.IncreaseWorkersCount() - ww.stack.Push(w) -} - -func (ww *workerWatcher) ReduceWorkersCount() { - ww.mutex.Lock() - ww.actualNumWorkers-- - ww.mutex.Unlock() -} -func (ww *workerWatcher) IncreaseWorkersCount() { ww.mutex.Lock() - ww.actualNumWorkers++ - ww.mutex.Unlock() + defer ww.mutex.Unlock() + ww.stack.Push(w) } // Destroy all underlying stack (but let them to complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { - ww.stack.mutex.Lock() - ww.stack.destroy = true - ww.stack.mutex.Unlock() - - tt := time.NewTicker(time.Millisecond * 100) - for { - select { - case <-tt.C: - ww.stack.mutex.Lock() - if len(ww.stack.workers) != int(ww.actualNumWorkers) { - ww.stack.mutex.Unlock() - continue - } - ww.stack.mutex.Unlock() - // unnecessary mutex, but - // just to make sure. All stack at this moment are in the stack - // Pop operation is blocked, push can't be done, since it's not possible to pop - ww.stack.mutex.Lock() - for i := 0; i < len(ww.stack.workers); i++ { - // set state for the stack in the stack (unused at the moment) - ww.stack.workers[i].State().Set(StateDestroyed) - } - ww.stack.mutex.Unlock() - tt.Stop() - // clear - ww.stack.Reset() - return - } - } + // destroy stack, we don't use ww mutex here, since we should be able to push worker + ww.stack.Destroy(ctx) } // Warning, this is O(n) operation, and it will return copy of the actual workers func (ww *workerWatcher) WorkersList() []WorkerBase { - ww.stack.mutex.Lock() - defer ww.stack.mutex.Unlock() - workersCopy := make([]WorkerBase, 0, 1) - for _, v := range ww.stack.workers { - sw := v.(SyncWorker) - workersCopy = append(workersCopy, sw) - } - - return workersCopy + return ww.stack.Workers() } func (ww *workerWatcher) wait(w WorkerBase) { @@ -287,37 +312,13 @@ func (ww *workerWatcher) wait(w WorkerBase) { return } - pid := w.Pid() - ww.stack.mutex.Lock() - for i := 0; i < len(ww.stack.workers); i++ { - // worker in the stack, reallocating - if ww.stack.workers[i].Pid() == pid { - ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.ReduceWorkersCount() - ww.stack.mutex.Unlock() - - err = ww.AllocateNew() - if err != nil { - ww.events.Push(PoolEvent{ - Event: EventPoolError, - Payload: errors.E(op, err), - }) - } - - return - } - } - - ww.stack.mutex.Unlock() - - // worker not in the stack (not returned), forget and allocate new + _ = ww.stack.FindAndRemoveByPid(w.Pid()) err = ww.AllocateNew() if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, Payload: errors.E(op, err), }) - return } } |