diff options
author | Valery Piashchynski <[email protected]> | 2021-01-25 15:43:15 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-01-25 15:43:15 +0300 |
commit | aa3a6a18c7d0cd08da0465377d22caa0cb5a4ff6 (patch) | |
tree | d8dbf5c76542546cab461837709874354e9a2445 | |
parent | 29d6020a9e8a3713b22269ed946547c96c24d3da (diff) | |
parent | 349aaf76c6203d5fa75dfc27c0409034cd91bb64 (diff) |
Merge pull request #495 from spiral/feature/stabilization
feat(all): stabilization
113 files changed, 1370 insertions, 1290 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/linux.yml index ed9d7f5a..27df293a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/linux.yml @@ -1,4 +1,4 @@ -name: build +name: Linux on: push: @@ -20,7 +20,7 @@ jobs: matrix: php: [ "7.4", "8.0" ] go: [ "1.14", "1.15" ] - os: [ ubuntu-latest, windows-latest, macos-latest ] + os: [ ubuntu-20.04 ] steps: - name: Set up Go ${{ matrix.go }} uses: actions/setup-go@v2 # action page: <https://github.com/actions/setup-go> @@ -37,12 +37,10 @@ jobs: uses: actions/checkout@v2 - name: Get Composer Cache Directory - if: ${{ matrix.os != 'windows-latest' }} id: composer-cache run: echo "::set-output name=dir::$(composer config cache-files-dir)" - name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer> - if: ${{ matrix.os != 'windows-latest' }} uses: actions/cache@v2 with: path: ${{ steps.composer-cache.outputs.dir }} @@ -62,77 +60,13 @@ jobs: - name: Install Go dependencies run: go mod download - - name: Run golang tests on Windows without codecov - if: ${{ matrix.os == 'windows-latest' }} - run: | - docker-compose -f ./tests/docker-compose.yaml up -d - go test -v -race -tags=debug ./utils - go test -v -race -tags=debug ./pkg/pipe - go test -v -race -tags=debug ./pkg/pool - go test -v -race -tags=debug ./pkg/socket - go test -v -race -tags=debug ./pkg/worker - go test -v -race -tags=debug ./pkg/worker_watcher - go test -v -race -tags=debug ./tests/plugins/http - go test -v -race -tags=debug ./plugins/http/config - go test -v -race -tags=debug ./tests/plugins/informer - go test -v -race -tags=debug ./tests/plugins/reload - go test -v -race -tags=debug ./tests/plugins/server - go test -v -race -tags=debug ./tests/plugins/checker - go test -v -race -tags=debug ./tests/plugins/config - go test -v -race -tags=debug ./tests/plugins/gzip - go test -v -race -tags=debug ./tests/plugins/headers - go test -v -race -tags=debug ./tests/plugins/logger - go test -v -race -tags=debug ./tests/plugins/metrics - go test -v -race -tags=debug ./tests/plugins/redis - go test -v -race -tags=debug ./tests/plugins/resetter - go test -v -race -tags=debug ./tests/plugins/rpc - go test -v -race -tags=debug ./tests/plugins/static - go test -v -race -tags=debug ./plugins/kv/boltdb - go test -v -race -tags=debug ./plugins/kv/memory - go test -v -race -tags=debug ./plugins/kv/memcached - go test -v -race -tags=debug ./tests/plugins/kv/boltdb - go test -v -race -tags=debug ./tests/plugins/kv/memory - go test -v -race -tags=debug ./tests/plugins/kv/memcached - docker-compose -f ./tests/docker-compose.yaml down - - - name: Run golang tests on MacOS without codecov - if: ${{ matrix.os == 'macos-latest' }} - run: | - go test -v -race -tags=debug ./utils - go test -v -race -tags=debug ./pkg/pipe - go test -v -race -tags=debug ./pkg/pool - go test -v -race -tags=debug ./pkg/socket - go test -v -race -tags=debug ./pkg/worker - go test -v -race -tags=debug ./pkg/worker_watcher - go test -v -race -tags=debug ./plugins/http/config - go test -v -race -tags=debug ./tests/plugins/http - go test -v -race -tags=debug ./tests/plugins/informer - go test -v -race -tags=debug ./tests/plugins/reload - go test -v -race -tags=debug ./tests/plugins/server - go test -v -race -tags=debug ./tests/plugins/checker - go test -v -race -tags=debug ./tests/plugins/config - go test -v -race -tags=debug ./tests/plugins/gzip - go test -v -race -tags=debug ./tests/plugins/headers - go test -v -race -tags=debug ./tests/plugins/logger - go test -v -race -tags=debug ./tests/plugins/metrics - go test -v -race -tags=debug ./tests/plugins/redis - go test -v -race -tags=debug ./tests/plugins/resetter - go test -v -race -tags=debug ./tests/plugins/rpc - go test -v -race -tags=debug ./tests/plugins/static - go test -v -race -tags=debug ./plugins/kv/boltdb - go test -v -race -tags=debug ./plugins/kv/memory - go test -v -race -tags=debug ./tests/plugins/kv/boltdb - go test -v -race -tags=debug ./tests/plugins/kv/memory - - name: Run golang tests on Linux - if: ${{ matrix.os == 'ubuntu-latest' }} run: | docker-compose -f ./tests/docker-compose.yaml up -d mkdir ./coverage-ci - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/utils.txt -covermode=atomic ./utils - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/transport/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/transport/socket go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/socket go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config @@ -160,7 +94,6 @@ jobs: cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt - uses: codecov/codecov-action@v1 # Docs: <https://github.com/codecov/codecov-action> - if: ${{ matrix.os == 'ubuntu-latest' }} # codecov only from linux with: token: ${{ secrets.CODECOV_TOKEN }} file: ./coverage-ci/summary.txt @@ -168,7 +101,7 @@ jobs: golangci-lint: name: Golang-CI (lint) - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 steps: - name: Check out code uses: actions/checkout@v2 diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml new file mode 100644 index 00000000..77f9cfda --- /dev/null +++ b/.github/workflows/macos.yml @@ -0,0 +1,88 @@ +name: macOS + +on: + push: + pull_request: + branches: + # Branches from forks have the form 'user:branch-name' so we only run + # this job on pull_request events for branches that look like fork + # branches. Without this we would end up running this job twice for non + # forked PRs, once for the push and then once for opening the PR. + - "**:**" + +jobs: + golang: + name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}}) + runs-on: ${{ matrix.os }} + timeout-minutes: 60 + strategy: + fail-fast: false + matrix: + php: [ "7.4", "8.0" ] + go: [ "1.14", "1.15" ] + os: [ macos-latest ] + steps: + - name: Set up Go ${{ matrix.go }} + uses: actions/setup-go@v2 # action page: <https://github.com/actions/setup-go> + with: + go-version: ${{ matrix.go }} + + - name: Set up PHP ${{ matrix.php }} + uses: shivammathur/setup-php@v2 # action page: <https://github.com/shivammathur/setup-php> + with: + php-version: ${{ matrix.php }} + extensions: sockets + + - name: Check out code + uses: actions/checkout@v2 + + - name: Get Composer Cache Directory + id: composer-cache + run: echo "::set-output name=dir::$(composer config cache-files-dir)" + + - name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer> + uses: actions/cache@v2 + with: + path: ${{ steps.composer-cache.outputs.dir }} + key: ${{ runner.os }}-composer-${{ matrix.php }}-${{ hashFiles('**/composer.json') }} + restore-keys: ${{ runner.os }}-composer- + + - name: Install Composer dependencies + run: cd tests && composer update --prefer-dist --no-progress --ansi + + - name: Init Go modules Cache # Docs: <https://git.io/JfAKn#go---modules> + uses: actions/cache@v2 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: ${{ runner.os }}-go- + + - name: Install Go dependencies + run: go mod download + + - name: Run golang tests + run: | + go test -v -race -tags=debug ./pkg/transport/pipe + go test -v -race -tags=debug ./pkg/transport/socket + go test -v -race -tags=debug ./pkg/pool + go test -v -race -tags=debug ./pkg/worker + go test -v -race -tags=debug ./pkg/worker_watcher + go test -v -race -tags=debug ./plugins/http/config + go test -v -race -tags=debug ./tests/plugins/http + go test -v -race -tags=debug ./tests/plugins/informer + go test -v -race -tags=debug ./tests/plugins/reload + go test -v -race -tags=debug ./tests/plugins/server + go test -v -race -tags=debug ./tests/plugins/checker + go test -v -race -tags=debug ./tests/plugins/config + go test -v -race -tags=debug ./tests/plugins/gzip + go test -v -race -tags=debug ./tests/plugins/headers + go test -v -race -tags=debug ./tests/plugins/logger + go test -v -race -tags=debug ./tests/plugins/metrics + go test -v -race -tags=debug ./tests/plugins/redis + go test -v -race -tags=debug ./tests/plugins/resetter + go test -v -race -tags=debug ./tests/plugins/rpc + go test -v -race -tags=debug ./tests/plugins/static + go test -v -race -tags=debug ./plugins/kv/boltdb + go test -v -race -tags=debug ./plugins/kv/memory + go test -v -race -tags=debug ./tests/plugins/kv/boltdb + go test -v -race -tags=debug ./tests/plugins/kv/memory diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml new file mode 100644 index 00000000..a58745e5 --- /dev/null +++ b/.github/workflows/windows.yml @@ -0,0 +1,81 @@ +name: windows + +on: + push: + pull_request: + branches: + # Branches from forks have the form 'user:branch-name' so we only run + # this job on pull_request events for branches that look like fork + # branches. Without this we would end up running this job twice for non + # forked PRs, once for the push and then once for opening the PR. + - "**:**" + +jobs: + golang: + name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}}) + runs-on: ${{ matrix.os }} + timeout-minutes: 60 + strategy: + fail-fast: false + matrix: + php: [ "7.4", "8.0" ] + go: [ "1.14", "1.15" ] + os: [ windows-latest ] + steps: + - name: Set up Go ${{ matrix.go }} + uses: actions/setup-go@v2 # action page: <https://github.com/actions/setup-go> + with: + go-version: ${{ matrix.go }} + + - name: Set up PHP ${{ matrix.php }} + uses: shivammathur/setup-php@v2 # action page: <https://github.com/shivammathur/setup-php> + with: + php-version: ${{ matrix.php }} + extensions: sockets + + - name: Check out code + uses: actions/checkout@v2 + + - name: Install Composer dependencies + run: cd tests && composer update --prefer-dist --no-progress --ansi + + - name: Init Go modules Cache # Docs: <https://git.io/JfAKn#go---modules> + uses: actions/cache@v2 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: ${{ runner.os }}-go- + + - name: Install Go dependencies + run: go mod download + + - name: Run golang tests on Windows without codecov + run: | + docker-compose -f ./tests/docker-compose.yaml up -d + go test -v -race -tags=debug ./pkg/transport/pipe + go test -v -race -tags=debug ./pkg/transport/socket + go test -v -race -tags=debug ./pkg/pool + go test -v -race -tags=debug ./pkg/worker + go test -v -race -tags=debug ./pkg/worker_watcher + go test -v -race -tags=debug ./tests/plugins/http + go test -v -race -tags=debug ./plugins/http/config + go test -v -race -tags=debug ./tests/plugins/informer + go test -v -race -tags=debug ./tests/plugins/reload + go test -v -race -tags=debug ./tests/plugins/server + go test -v -race -tags=debug ./tests/plugins/checker + go test -v -race -tags=debug ./tests/plugins/config + go test -v -race -tags=debug ./tests/plugins/gzip + go test -v -race -tags=debug ./tests/plugins/headers + go test -v -race -tags=debug ./tests/plugins/logger + go test -v -race -tags=debug ./tests/plugins/metrics + go test -v -race -tags=debug ./tests/plugins/redis + go test -v -race -tags=debug ./tests/plugins/resetter + go test -v -race -tags=debug ./tests/plugins/rpc + go test -v -race -tags=debug ./tests/plugins/static + go test -v -race -tags=debug ./plugins/kv/boltdb + go test -v -race -tags=debug ./plugins/kv/memory + go test -v -race -tags=debug ./plugins/kv/memcached + go test -v -race -tags=debug ./tests/plugins/kv/boltdb + go test -v -race -tags=debug ./tests/plugins/kv/memory + go test -v -race -tags=debug ./tests/plugins/kv/memcached + docker-compose -f ./tests/docker-compose.yaml down @@ -9,11 +9,11 @@ server: "RR_HTTP": "true" "RR_RPC": "tcp://127.0.0.1:6001" relay: "pipes" - relay_timeout: "20s" + relay_timeout: 20s logs: mode: development - level: debug + level: error http: # host and port separated by semicolon @@ -44,7 +44,6 @@ http: input: "custom-header" response: output: "output-header" - static: dir: "tests" forbid: [ "" ] @@ -59,24 +58,24 @@ http: allocate_timeout: 60s destroy_timeout: 60s supervisor: - # WatchTick defines how often to check the state of worker (seconds) - watch_tick: 1 - # TTL defines maximum time worker is allowed to live (seconds) + # watch_tick defines how often to check the state of the workers (seconds) + watch_tick: 1s + # ttl defines maximum time worker is allowed to live (seconds) ttl: 0 - # IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0 (seconds) - idle_ttl: 10 - # ExecTTL defines maximum lifetime per job (seconds) - exec_ttl: 10 - # MaxWorkerMemory limits memory per worker (MB) + # idle_ttl defines maximum duration worker can spend in idle mode after first use. Disabled when 0 (seconds) + idle_ttl: 10s + # exec_ttl defines maximum lifetime per job (seconds) + exec_ttl: 10s + # max_worker_memory limits memory usage per worker (MB) max_worker_memory: 100 - # ssl: - # host and port separated by semicolon (default :443) - # address: :8892 - # redirect: false - # cert: fixtures/server.crt - # key: fixtures/server.key - # rootCa: root.crt + ssl: + # host and port separated by semicolon (default :443) + address: :8892 + redirect: false + cert: fixtures/server.crt + key: fixtures/server.key + root_ca: root.crt fcgi: address: tcp://0.0.0.0:7921 http2: @@ -27,10 +27,9 @@ test_coverage: docker-compose -f tests/docker-compose.yaml up -d rm -rf coverage mkdir coverage - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/utils.out -covermode=atomic ./utils - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipe.out -covermode=atomic ./pkg/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipe.out -covermode=atomic ./pkg/transport/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/socket.out -covermode=atomic ./pkg/transport/socket go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pool.out -covermode=atomic ./pkg/pool - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/socket.out -covermode=atomic ./pkg/socket go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http @@ -59,10 +58,9 @@ test_coverage: test: ## Run application tests docker-compose -f tests/docker-compose.yaml up -d - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./utils - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/socket go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pool - go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/socket go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/http @@ -88,14 +86,34 @@ test: ## Run application tests go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memcached docker-compose -f tests/docker-compose.yaml down -lint: ## Run application linters - golangci-lint run -kv: +test_1.14: ## Run application tests docker-compose -f tests/docker-compose.yaml up -d - go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/boltdb - go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/memory - go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/memcached - go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/kv/boltdb - go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/kv/memory - go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/kv/memcached + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/pipe + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/transport/socket + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pool + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/worker_watcher + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/http + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/http/config + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/informer + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/reload + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/server + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/checker + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/config + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/gzip + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/headers + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/logger + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/metrics + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/redis + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/resetter + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/rpc + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/static + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/boltdb + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/memory + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./plugins/kv/memcached + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/boltdb + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memory + go1.14.14 test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./tests/plugins/kv/memcached docker-compose -f tests/docker-compose.yaml down + +test_pipeline: test_1.14 test @@ -6,7 +6,9 @@ <p align="center"> <a href="https://packagist.org/packages/spiral/roadrunner"><img src="https://poser.pugx.org/spiral/roadrunner/version"></a> <a href="https://pkg.go.dev/github.com/spiral/roadrunner?tab=doc"><img src="https://godoc.org/github.com/spiral/roadrunner?status.svg"></a> - <a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/CI/badge.svg" alt=""></a> + <a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/Linux/badge.svg" alt=""></a> + <a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/macOS/badge.svg" alt=""></a> + <a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/windows/badge.svg" alt=""></a> <a href="https://goreportcard.com/report/github.com/spiral/roadrunner"><img src="https://goreportcard.com/badge/github.com/spiral/roadrunner"></a> <a href="https://scrutinizer-ci.com/g/spiral/roadrunner/?branch=master"><img src="https://scrutinizer-ci.com/g/spiral/roadrunner/badges/quality-score.png"></a> <a href="https://codecov.io/gh/spiral/roadrunner/"><img src="https://codecov.io/gh/spiral/roadrunner/branch/master/graph/badge.svg"></a> @@ -18,7 +20,8 @@ RoadRunner is an open-source (MIT licensed) high-performance PHP application server, load balancer, and process manager. It supports running as a service with the ability to extend its functionality on a per-project basis. -RoadRunner includes PSR-7/PSR-17 compatible HTTP and HTTP/2 server and can be used to replace classic Nginx+FPM setup with much greater performance and flexibility. +RoadRunner includes PSR-7/PSR-17 compatible HTTP and HTTP/2 server and can be used to replace classic Nginx+FPM setup +with much greater performance and flexibility. <p align="center"> <a href="https://roadrunner.dev/"><b>Official Website</b></a> | @@ -27,6 +30,7 @@ RoadRunner includes PSR-7/PSR-17 compatible HTTP and HTTP/2 server and can be us Features: -------- + - Production-ready - PCI DSS compliant - PSR-7 HTTP server (file uploads, error handling, static files, hot reload, middlewares, event listeners) @@ -63,11 +67,12 @@ go get -u github.com/spiral/roadrunner/v2 > For getting roadrunner binary file you can use our docker image: `spiralscout/roadrunner:X.X.X` (more information about image and tags can be found [here](https://hub.docker.com/r/spiralscout/roadrunner/)) -Configuration can be located in `.rr.yaml` file ([full sample](https://github.com/spiral/roadrunner/blob/master/.rr.yaml)): +Configuration can be located in `.rr.yaml` +file ([full sample](https://github.com/spiral/roadrunner/blob/master/.rr.yaml)): ```yaml http: - address: 0.0.0.0:8080 + address: 0.0.0.0:8080 workers.command: "php worker.php" ``` @@ -83,4 +88,5 @@ $ ./rr serve -v -d License: -------- -The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. Maintained by [Spiral Scout](https://spiralscout.com).
\ No newline at end of file +The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. Maintained +by [Spiral Scout](https://spiralscout.com).
\ No newline at end of file diff --git a/cmd/cli/root.go b/cmd/cli/root.go index 5e201daa..6f73aecf 100644 --- a/cmd/cli/root.go +++ b/cmd/cli/root.go @@ -16,7 +16,7 @@ import ( "net/http" "github.com/spf13/cobra" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" ) var ( diff --git a/cmd/main.go b/cmd/main.go index 68d0cadb..1dd19107 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,7 +3,7 @@ package main import ( "log" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/cmd/cli" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/informer" @@ -22,7 +22,7 @@ import ( func main() { var err error - cli.Container, err = endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.RetryOnFail(false)) + cli.Container, err = endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.RetryOnFail(false)) if err != nil { log.Fatal(err) } diff --git a/codecov.yml b/codecov.yml index 45fccc1d..43716f56 100644 --- a/codecov.yml +++ b/codecov.yml @@ -17,5 +17,7 @@ ignore: - "plugins/kv/boltdb/plugin_unit_test.go" - "plugins/kv/memcached/plugin_unit_test.go" - "plugins/kv/memory/plugin_unit_test.go" + - "pkg/events/pool_events.go" + - "pkg/events/worker_events.go" - "interfaces" - "systemd"
\ No newline at end of file @@ -4,13 +4,11 @@ go 1.15 require ( github.com/NYTimes/gziphandler v1.1.1 - github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/alicebob/miniredis/v2 v2.14.1 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 github.com/dustin/go-humanize v1.0.0 github.com/fatih/color v1.10.0 - github.com/go-ole/go-ole v1.2.4 // indirect github.com/go-redis/redis/v8 v8.4.4 github.com/gofiber/fiber/v2 v2.3.0 github.com/golang/mock v1.4.4 @@ -22,7 +20,7 @@ require ( github.com/shirou/gopsutil v3.20.11+incompatible github.com/spf13/cobra v1.1.1 github.com/spf13/viper v1.7.1 - github.com/spiral/endure v1.0.0-beta20 + github.com/spiral/endure v1.0.0-beta21 github.com/spiral/errors v1.0.9 github.com/spiral/goridge/v3 v3.0.0 github.com/stretchr/testify v1.6.1 @@ -87,8 +87,8 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= -github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= -github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= +github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= +github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-redis/redis/v8 v8.4.4 h1:fGqgxCTR1sydaKI00oQf3OmkU/DIe/I/fYXvGklCIuc= github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY= github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM= @@ -302,15 +302,16 @@ github.com/spf13/viper v1.7.0 h1:xVKxvI7ouOI5I+U9s2eeiUfMaWBVoXA3AWskkrqK0VM= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spiral/endure v1.0.0-beta20 h1:QD3EJ6CRLgeo/6trfnlUcQhH3vrK8Hvf9ceDpde+yss= -github.com/spiral/endure v1.0.0-beta20/go.mod h1:qCU2/4gAItVESzUK0yPExmUTlTcpRLqJUgcV+nqxn+o= -github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/endure v1.0.0-beta21 h1:YW3gD6iNhRByG/yFkm/Ko+nj+oTBsjBtPVHFA2nt67k= +github.com/spiral/endure v1.0.0-beta21/go.mod h1:GsItn+dYSO4O5uwvfki23xyxRnmBhxEyL6jBeJQoFPw= github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM= github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/errors v1.0.7/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.9 h1:RcVZ7a1RYkaT3HWFGDuQiDB02pG6yqh7715Uwd7urwM= github.com/spiral/errors v1.0.9/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/goridge/v3 v3.0.0 h1:FIz6wHaob5KynpOfzVpzj4bmqbEelGPFyuEf4i2+CG8= github.com/spiral/goridge/v3 v3.0.0/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= +github.com/spiral/roadrunner v1.9.2 h1:jGtXs3r5fevdbrkDF8BdFxEY4rIZwplnns1oWj7Vyw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -435,6 +436,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go deleted file mode 100644 index 97cc945c..00000000 --- a/interfaces/pool/pool.go +++ /dev/null @@ -1,100 +0,0 @@ -package pool - -import ( - "context" - "runtime" - "time" - - "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/pkg/payload" -) - -// Pool managed set of inner worker processes. -type Pool interface { - // GetConfig returns pool configuration. - GetConfig() interface{} - - // Exec executes task with payload - Exec(rqs payload.Payload) (payload.Payload, error) - - // ExecWithContext executes task with context which is used with timeout - ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) - - // Workers returns worker list associated with the pool. - Workers() (workers []worker.BaseProcess) - - // Remove worker from the pool. - RemoveWorker(worker worker.BaseProcess) error - - // Destroy all underlying stack (but let them to complete the task). - Destroy(ctx context.Context) -} - -// Configures the pool behaviour. -type Config struct { - // Debug flag creates new fresh worker before every request. - Debug bool - - // NumWorkers defines how many sub-processes can be run at once. This value - // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. - NumWorkers int64 - - // MaxJobs defines how many executions is allowed for the worker until - // it's destruction. set 1 to create new process for each new task, 0 to let - // worker handle as many tasks as it can. - MaxJobs int64 - - // AllocateTimeout defines for how long pool will be waiting for a worker to - // be freed to handle the task. Defaults to 60s. - AllocateTimeout time.Duration - - // DestroyTimeout defines for how long pool should be waiting for worker to - // properly destroy, if timeout reached worker will be killed. Defaults to 60s. - DestroyTimeout time.Duration - - // Supervision config to limit worker and pool memory usage. - Supervisor *SupervisorConfig -} - -// InitDefaults enables default config values. -func (cfg *Config) InitDefaults() { - if cfg.NumWorkers == 0 { - cfg.NumWorkers = int64(runtime.NumCPU()) - } - - if cfg.AllocateTimeout == 0 { - cfg.AllocateTimeout = time.Minute - } - - if cfg.DestroyTimeout == 0 { - cfg.DestroyTimeout = time.Minute - } - if cfg.Supervisor == nil { - return - } - cfg.Supervisor.InitDefaults() -} - -type SupervisorConfig struct { - // WatchTick defines how often to check the state of worker. - WatchTick uint64 - - // TTL defines maximum time worker is allowed to live. - TTL uint64 - - // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL uint64 - - // ExecTTL defines maximum lifetime per job. - ExecTTL uint64 - - // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 -} - -// InitDefaults enables default config values. -func (cfg *SupervisorConfig) InitDefaults() { - if cfg.WatchTick == 0 { - cfg.WatchTick = 1 - } -} diff --git a/internal/state.go b/internal/state.go index 8f7d939b..a14a6937 100755 --- a/internal/state.go +++ b/internal/state.go @@ -13,7 +13,7 @@ type State interface { // Set sets the WorkerState Set(value int64) // NumJobs shows how many times WorkerProcess was invoked - NumExecs() int64 + NumExecs() uint64 // IsActive returns true if WorkerProcess not Inactive or Stopped IsActive() bool // RegisterExec using to registering php executions @@ -56,7 +56,7 @@ const ( type WorkerState struct { value int64 - numExecs int64 + numExecs uint64 // to be lightweight, use UnixNano lastUsed uint64 } @@ -87,8 +87,8 @@ func (s *WorkerState) String() string { } // NumExecs returns number of registered WorkerProcess execs. -func (s *WorkerState) NumExecs() int64 { - return atomic.LoadInt64(&s.numExecs) +func (s *WorkerState) NumExecs() uint64 { + return atomic.LoadUint64(&s.numExecs) } // Value WorkerState returns WorkerState value @@ -109,7 +109,7 @@ func (s *WorkerState) Set(value int64) { // register new execution atomically func (s *WorkerState) RegisterExec() { - atomic.AddInt64(&s.numExecs, 1) + atomic.AddUint64(&s.numExecs, 1) } // Update last used time diff --git a/pkg/doc/README.md b/pkg/doc/README.md new file mode 100644 index 00000000..4f726f4a --- /dev/null +++ b/pkg/doc/README.md @@ -0,0 +1,21 @@ +This is the drawio diagrams showing basic workflows inside RoadRunner 2.0 + +Simple HTTP workflow description: +![alt text](pool_workflow.svg) + +1. Allocate sync workers. When plugin starts (which use workers pool), then it allocates required number of processes + via `cmd.exec` command. + +2. When user send HTTP request to the RR2, HTTP plugin receive it and transfer to the workers pool `Exec/ExecWithContex` +method. And workers pool ask Worker watcher to get free worker. + +3. Workers watcher uses stack data structure under the hood and making POP operation to get first free worker. If there are +no workers in the `stack`, watcher waits for the specified via config (`allocate_timeout`) time. + +4. Stack returns free worker to the watcher. +5. Watcher returns that worker to the `pool`. +6. Pool invoke `Exec/ExecWithTimeout` method on the golang worker with provided request payload. +7. Golang worker send that request to the PHP worker via various set of transports (`pkg/transport` package). +8. PHP worker send back response to the golang worker (or error via stderr). +9. Golang worker return response payload to the pool. +10. Pool process this response and return answer to the user.
\ No newline at end of file diff --git a/pkg/doc/pool_workflow.drawio b/pkg/doc/pool_workflow.drawio new file mode 100644 index 00000000..3f74d0fc --- /dev/null +++ b/pkg/doc/pool_workflow.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-01-24T16:29:37.978Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="8v25I0qLU_vMqqVrx7cN" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">7Vxbd6M4Ev41OWfnoX0Qkrg8Jul09+zOTGc6vSfTjwRkm22MvCB37Pn1IwGydcExsTGO03HOiUEgLqqqr6o+lXwBr2fLj0U0n/5OE5JduE6yvIDvL1wXAMflX6Jl1bQ4jlO3TIo0qduUhrv0byJPbFoXaULKpq1uYpRmLJ3rjTHNcxIzrS0qCvqonzamWaI1zKMJsRru4iizW+/ThE3li3nh5sAnkk6m8tYeRvWRWSTPbl6lnEYJfVSa4M0FvC4oZfXWbHlNMjF8+sB82HJ0/WQFyVmXDn/hCPvfPk++/Ocm+HyPr5Z/TsfvvOYyP6Js0bzyhetl/IJXU37Im4itu3Q25wPiOv8tScG/CvL/BSmZPJHfcnNu865sJUewoIs8IeIZHH74cZoycjePYnH0kSuNuBGbZXwP8M0xzdmHaJZmQl+u6aJIqxv+Qfi4XUVZOsl5e0bG4k4/SMFSLqnLppnR+fr+6qjIN+Snk6XS1IzSR0JnhBUrfkpzFGNcd2mUFrmNFj+qGhA2GjBVpB9ItY0atZusL74RDN9oZPMMOYHQGliScE1tdmnBpnRC8yi72bRe6UO/Oec3KoaqGvD/EcZWjdlFC0Z1cZBlyv4S3Ucubna/KYfeL5tLVzsruZPzF1Z7if1v6sFNv2pPdtSE/2/CHooozUv+0r/TnD4l2pIrSkyeGr0GOaJiQtgT50lTFUP7pKYUJItY+kMHiTahN11vacqfea1hoedpGgYc2SCvUT9q083QnfVz7K9Orm32H8XrcNgS/8pVHgt9p8X3yv7+dfv5ln/ROSn4a9P8l1Zl/C164PCvKZA02ZiLjF/JNtpZmiS1rpIy/Tt6qK4nlGEuXr0aDHx1gd/vpR5PWpKFB2sn0jyFBtNtOPHOGTke0iUZ1HsH6sc7AJ2RDkL8ZsB1lA80VYaOxyU5jrZgS1vgmwbUQnE939e9RT8agAz5hyMHDSZv/7TOxledjdPV2fiaswEv3tnAvn3NQQGGRC7FxvEZ27hU4cNtnJs4QlDHYtiPjUPDxvEogEPZOHTsCEDcae3030RfwTvwdNm7vYgemaJ/B9DIw0MJ347+7iupi8G+j1g8bVGA5+VxaZZd04wWVV84HhMvjnl7yQr6nShHEj98cA7C4+55XQgNlwqbCErN63BjF2pa5x0tqwPWMA/qaPdJ6rSMbrT2uYO7WbejmwV9u9mq62VRRCvlhAbNthp8YOpeE71vtKe+Yr9m7lp2fpllNI6YleSVZwz4YBsS7AP4GHlHiecD3Y0gA1KOGMxDSwnaUecnFDaAGGhi6SmwQ7pvxyPgDyZuS9q/5ilLuSS5N+jTpycRCcatPt2LA/IwHsan+4GOq4HT4tOdFp9uGmB/Pr09bXpjajt5dXgqr36YzD3L6lQ69Y572vtzT6ukXvfEmjm6P/T7AV7fzKu8keerrKk7GA4jSyXcN+nX0oc8/NWl1I/0PYsz9Qfzu/CksA8UzFf40+flcv7pkrmusA/dF4X70MZ974yNHPZm5Dy0xrgnqwZoBAejRVE7SJ/V1Acf/B2GXO3dkiLlwyV076VYN+rbuvfiaoADA82PyKTiQ9cOPtrVwceHdcAONtT/CPwRaqEOZDHTOeKbNO4+8M1xJROxL74NAGZ2DHpH8qR65nJO81IkJxyL+L+p2FyUZ52h9CfeutjCmKHphwj0Xf2qbjAabNrHNuebJYn5pcTXfcqm11wQZMksHTh/nsic+2nlidrmfo7GE21JB88q0jhdkYWs1DuT6R/gID1C4A27PP6OHsdx+a6d0aAzdgnSyPrw+BAiA7t7mi1wDdoCgAHrQNzQEvivAgKjir4cF0R8recIryt0LETbii7EAxb2rMJPqh6uo08cyqmYA9UDADxCoakgCKmsJhhMW+yqoTsWxd8rR8r96Pbp5INCCEyCBLWFEIH7IOpgTxJCACc4df0ItI33tOsENBrS7UhDrqMItdtpaEi3Y1TRtXZzky46h9JhA6QHrqVNRn7wNZ0RuniN+QFwPd25tyYIshZwkAQB2umaNfDlNJqLzXFGlpdiuVtlLEmz+T7OorJMY8NYN9F8r5O++5SV7W+qOOzZVBU5t2G4bDvUmzuSQZPMHzCWcdRvbq38aSMdDQ7RGXYNEbTZJXN94BZtXcyyy5hRNQas4sVbWqZieRE/5YEyRmctQWK9ym/bksEjef7QWBAo1/4pOgMH9ft2sf6a2GuGXuX1tpR0V1TP0UJ0ESWUdcAwkJgAMDFcptAqhoeBLSizPKs3QSG70voLiUldG7IhYV+9YHyj2iN0bMHI9SbDCMZe0maJgV8mnZdEl4TEsJKjK+O5DyN2SFMnZW0hzXjcHBlg1D3PoDzlhMUO1ILHQi1kc0v2mO9MVhRZmEawdhpKCETnJK9bmuwleFIuL6/OXWrq7sXLHQOdQycwjNXxARo27EC+pUW32WKS5rxtXeparVJ+9bjqOUYsGUIbVuUvPAwCq7Kc+q0K6qj0g+QDX0gZFLYz1fYVxOfBCuNthrhXpSuSBnho5ojaS1kNtvn4NBE+7erwM6cYUWfeond/fpiN2ylncM423uP6cBA6ejlzP/OCAIhCaeWjzwfLepQByojsWYYtqxkOWiOMxV8rD1x9OsheHFf61Z+eUliMXtpCI2zzbzVdz990ldEoefXxL0BGjUQb4SOXGgwT/9q0whfy46fje5Br1MMGLUTcoHwPtjHs09ev4ket5k3yeBCU7QImA+qCmLT/HMJDwHHmyYmRZ7DW2FhkIxfDaIjVwoZCdCQheHb1gDXuL2lGa6sQdgZvuGuCdqJZJ+yH+jW6zzo54QgHys+Q6TpmTnkcmQyShL9KBlGadUBY1SRzmpNngW4bHOiAsUu1LMtfc8Z94K9jBCt+S6UKalE391iximczBB9pFuUT3rYllvzJRAbd3SJrjS+PJzM7lrn9JDxm5+nEVy0wLHmdJwQWoF7kxXc3PxNcQ+fm55bhzT8=</diagram></mxfile>
\ No newline at end of file diff --git a/pkg/doc/pool_workflow.svg b/pkg/doc/pool_workflow.svg new file mode 100644 index 00000000..1e043eaa --- /dev/null +++ b/pkg/doc/pool_workflow.svg @@ -0,0 +1,3 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> +<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="1200px" height="811px" viewBox="-0.5 -0.5 1200 811" content="<mxfile host="Electron" modified="2021-01-24T14:08:35.229Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="XUF1gTxKr8mJfXGLcVd8" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">7Vxbd6M4Ev41OWfnoX0Qkrg8Jul09+zOTGc6vSfTjwRkm22MvCB37Pn1IwGydcExsTGO03HOiUEgLqqqr6o+lXwBr2fLj0U0n/5OE5JduE6yvIDvL1wXAMflX6Jl1bQ4jlO3TIo0qduUhrv0byJPbFoXaULKpq1uYpRmLJ3rjTHNcxIzrS0qCvqonzamWaI1zKMJsRru4iizW+/ThE3li3nh5sAnkk6m8tYeRvWRWSTPbl6lnEYJfVSa4M0FvC4oZfXWbHlNMjF8+sB82HJ0/WQFyVmXDn/hCPvfPk++/Ocm+HyPr5Z/TsfvvOYyP6Js0bzyhetl/IJXU37Im4itu3Q25wPiOv8tScG/CvL/BSmZPJHfcnNu865sJUewoIs8IeIZHH74cZoycjePYnH0kSuNuBGbZXwP8M0xzdmHaJZmQl+u6aJIqxv+Qfi4XUVZOsl5e0bG4k4/SMFSLqnLppnR+fr+6qjIN+Snk6XS1IzSR0JnhBUrfkpzFGNcd2mUFrmNFj+qGhA2GjBVpB9ItY0atZusL74RDN9oZPMMOYHQGliScE1tdmnBpnRC8yi72bRe6UO/Oec3KoaqGvD/EcZWjdlFC0Z1cZBlyv4S3Ucubna/KYfeL5tLVzsruZPzF1Z7if1v6sFNv2pPdtSE/2/CHooozUv+0r/TnD4l2pIrSkyeGr0GOaJiQtgT50lTFUP7pKYUJItY+kMHiTahN11vacqfea1hoedpGgYc2SCvUT9q083QnfVz7K9Orm32H8XrcNgS/8pVHgt9p8X3yv7+dfv5ln/ROSn4a9P8l1Zl/C164PCvKZA02ZiLjF/JNtpZmiS1rpIy/Tt6qK4nlGEuXr0aDHx1gd/vpR5PWpKFB2sn0jyFBtNtOPHOGTke0iUZ1HsH6sc7AJ2RDkL8ZsB1lA80VYaOxyU5jrZgS1vgmwbUQnE939e9RT8agAz5hyMHDSZv/7TOxledjdPV2fiaswEv3tnAvn3NQQGGRC7FxvEZ27hU4cNtnJs4QlDHYtiPjUPDxvEogEPZOHTsCEDcae3030RfwTvwdNm7vYgemaJ/B9DIw0MJ347+7iupi8G+j1g8bVGA5+VxaZZd04wWVV84HhMvjnl7yQr6nShHEj98cA7C4+55XQgNlwqbCErN63BjF2pa5x0tqwPWMA/qaPdJ6rSMbrT2uYO7WbejmwV9u9mq62VRRCvlhAbNthp8YOpeE71vtKe+Yr9m7lp2fpllNI6YleSVZwz4YBsS7AP4GHlHiecD3Y0gA1KOGMxDSwnaUecnFDaAGGhi6SmwQ7pvxyPgDyZuS9q/5ilLuSS5N+jTpycRCcatPt2LA/IwHsan+4GOq4HT4tOdFp9uGmB/Pr09bXpjajt5dXgqr36YzD3L6lQ69Y572vtzT6ukXvfEmjm6P/T7AV7fzKu8keerrKk7GA4jSyXcN+nX0oc8/NWl1I/0PYsz9Qfzu/CksA8UzFf40+flcv7pkrmusA/dF4X70MZ974yNHPZm5Dy0xrgnqwZoBAejRVE7SJ/V1Acf/B2GXO3dkiLlwyV076VYN+rbuvfiaoADA82PyKTiQ9cOPtrVwceHdcAONtT/CPwRaqEOZDHTOeKbNO4+8M1xJROxL74NAGZ2DHpH8qR65nJO81IkJxyL+L+p2FyUZ52h9CfeutjCmKHphwj0Xf2qbjAabNrHNuebJYn5pcTXfcqm11wQZMksHTh/nsic+2nlidrmfo7GE21JB88q0jhdkYWs1DuT6R/gID1C4A27PP6OHsdx+a6d0aAzdgnSyPrw+BAiA7t7mi1wDdoCgAHrQNzQEvivAgKjir4cF0R8recIryt0LETbii7EAxb2rMJPqh6uo08cyqmYA9UDADxCoakgCKmsJhhMW+yqoTsWxd8rR8r96Pbp5INCCEyCBLWFEIH7IOpgTxJCACc4df0ItI33tOsENBrS7UhDrqMItdtpaEi3Y1TRtXZzky46h9JhA6QHrqVNRn7wNZ0RuniN+QFwPd25tyYIshZwkAQB2umaNfDlNJqLzXFGlpdiuVtlLEmz+T7OorJMY8NYN9F8r5O++5SV7W+qOOzZVBU5t2G4bDvUmzuSQZPMHzCWcdRvbq38aSMdDQ7RGXYNEbTZJXN94BZtXcyyy5hRNQas4sVbWqZieRE/5YEyRmctQWK9ym/bksEjef7QWBAo1/4pOgMH9ft2sf6a2GuGXuX1tpR0V1TP0UJ0ESWUdcAwkJgAMDFcptAqhoeBLSizPKs3QSG70voLiUldG7IhYV+9YHyj2iN0bMHI9SbDCMZe0maJgV8mnZdEl4TEsJKjK+O5DyN2SFMnZW0hzXjcHBlg1D3PoDzlhMUO1ILHQi1kc0v2mO9MVhRZmEawdhpKCETnJK9bmuwleFIuL6/OXWrq7sXLHQOdQycwjNXxARo27EC+pUW32WKS5rxtXeparVJ+9bjqOUYsGUIbVuUvPAwCq7Kc+q0K6qj0g+QDX0gZFLYz1fYVxOfBCuNthrhXpSuSBnho5ojaS1kNtvn4NBE+7erwM6cYUWfeond/fpiN2ylncM423uP6cBA6ejlzP/OCAIhCaeWjzwfLepQByojsWYYtqxkOWiOMxV8rD1x9OsheHFf61Z+eUliMXtpCI2zzbzVdz990ldEoefXxL0BGjUQb4SOXGgwT/9q0whfy46fje5Br1MMGLUTcoHwPtjHs09ev4ket5k3yeBCU7QImA+qCmLT/HMJDwHHmyYmRZ7DW2FhkIxfDaIjVwoZCdCQheHb1gDXuL2lGa6sQdgZvuGuCdqJZJ+yH+jW6zzo54QgHys+Q6TpmTnkcmQyShL9KBlGadUBY1SRzmpNngW4bHOiAsUu1LMtfc8Z94K9jBCt+S6UKalE391iximczBB9pFuUT3rYllvzJRAbd3SJrjS+PJzM7lrn9JDxm5+nEVy0wLHmdJwQWoF7kxXc3PxNcQ+fm55bhzT8=</diagram></mxfile>" style="background-color: rgb(255, 255, 255);"><defs/><g><rect x="0" y="0" width="1199" height="810" fill="#ffffff" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe flex-start; justify-content: unsafe flex-start; width: 1197px; height: 1px; padding-top: 7px; margin-left: 2px;"><div style="box-sizing: border-box; font-size: 0; text-align: left; "><div style="display: inline-block; font-size: 12px; font-family: Courier New; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; "><h1>Simple User request</h1></div></div></div></foreignObject><text x="2" y="19" fill="#000000" font-family="Courier New" font-size="12px">Simple User request</text></switch></g><path d="M 417.5 574 L 417.5 657.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 417.5 662.88 L 414 655.88 L 417.5 657.63 L 421 655.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 616px; margin-left: 296px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Give me sync worker (POP operation)</div></div></div></foreignObject><text x="296" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Give me sync worker (POP operation)</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 617px; margin-left: 418px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">3</div></div></div></foreignObject><text x="418" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">3</text></switch></g><path d="M 492.5 514 L 492.5 430.37" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 492.5 425.12 L 496 432.12 L 492.5 430.37 L 489 432.12 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 493px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">5</div></div></div></foreignObject><text x="493" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">5</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 465px; margin-left: 535px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Get worker</div></div></div></foreignObject><text x="535" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Get worker</text></switch></g><rect x="380" y="514" width="150" height="60" fill="#ffe6cc" stroke="#d79b00" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 544px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Workers Watcher</div></div></div></foreignObject><text x="455" y="548" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Workers Watcher</text></switch></g><path d="M 280 424 L 280 544 L 373.63 544" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 378.88 544 L 371.88 547.5 L 373.63 544 L 371.88 540.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 515px; margin-left: 202px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Allocate sync workers</div></div></div></foreignObject><text x="202" y="518" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Allocate sync workers</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 514px; margin-left: 280px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">1</div></div></div></foreignObject><text x="280" y="518" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">1</text></switch></g><rect x="230" y="384" width="100" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 404px; margin-left: 231px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Initialize</div></div></div></foreignObject><text x="280" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Initialize</text></switch></g><path d="M 417.5 424 L 417.5 507.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 417.5 512.88 L 414 505.88 L 417.5 507.63 L 421 505.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 352px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Give me SyncWorker</div></div></div></foreignObject><text x="352" y="467" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Give me SyncWorker</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 418px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">2</div></div></div></foreignObject><text x="418" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">2</text></switch></g><path d="M 530 414 L 700.63 414" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 705.88 414 L 698.88 417.5 L 700.63 414 L 698.88 410.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 415px; margin-left: 618px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">6</div></div></div></foreignObject><text x="618" y="418" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">6</text></switch></g><path d="M 492.5 384 L 483 384 L 483 324 L 520 324 L 520 83 L 468.87 83" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 463.62 83 L 470.62 79.5 L 468.87 83 L 470.62 86.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 224px; margin-left: 521px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">10</div></div></div></foreignObject><text x="521" y="227" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">10</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 225px; margin-left: 597px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Send response to the user</div></div></div></foreignObject><text x="597" y="228" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Send response to the user</text></switch></g><rect x="380" y="384" width="150" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 404px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Exec/ExecWithContext</div></div></div></foreignObject><text x="455" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec/ExecWithContext</text></switch></g><path d="M 492.5 664 L 492.5 624 L 492.5 580.37" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 492.5 575.12 L 496 582.12 L 492.5 580.37 L 489 582.12 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 616px; margin-left: 494px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">4</div></div></div></foreignObject><text x="494" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">4</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 617px; margin-left: 610px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">I have free workers, here you are</div></div></div></foreignObject><text x="610" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">I have free workers, here you are</text></switch></g><rect x="380" y="664" width="150" height="60" fill="#d5e8d4" stroke="#82b366" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 694px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Stack with workers</div></div></div></foreignObject><text x="455" y="698" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Stack with workers</text></switch></g><path d="M 707 394 L 536.37 394" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 531.12 394 L 538.12 390.5 L 536.37 394 L 538.12 397.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 394px; margin-left: 618px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">9</div></div></div></foreignObject><text x="618" y="397" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">9</text></switch></g><rect x="707" y="384" width="163" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 161px; height: 1px; padding-top: 404px; margin-left: 708px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Exec/ExecWithTimeout</div></div></div></foreignObject><text x="789" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec/ExecWithTimeout</text></switch></g><path d="M 450 289.5 L 460 289.5 L 460 364.5 L 470.5 364.5 L 455 383.5 L 439.5 364.5 L 450 364.5 Z" fill="none" stroke="#000000" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="all"/><ellipse cx="455" cy="84.5" rx="7.5" ry="7.5" fill="#ffffff" stroke="#000000" pointer-events="all"/><path d="M 455 92 L 455 117 M 455 97 L 440 97 M 455 97 L 470 97 M 455 117 L 440 137 M 455 117 L 470 137" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 144px; margin-left: 455px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">User request</div></div></div></foreignObject><text x="455" y="156" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">User...</text></switch></g><rect x="607" y="426" width="198" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 435px; margin-left: 706px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Send request to the worker</div></div></div></foreignObject><text x="706" y="438" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Send request to the worker</text></switch></g><rect x="618" y="368" width="125" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 377px; margin-left: 681px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Receive response</div></div></div></foreignObject><text x="681" y="380" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Receive response</text></switch></g><ellipse cx="125" cy="404" rx="11" ry="11" fill="#000000" stroke="#ff0000" pointer-events="all"/><path d="M 140 404 L 227.76 404" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 219.88 408.5 L 228.88 404 L 219.88 399.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="all"/><rect x="45" y="371" width="161" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 380px; margin-left: 126px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Plugin Initialization</div></div></div></foreignObject><text x="126" y="383" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Plugin Initialization</text></switch></g><path d="M 870 414 L 983.63 414" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 988.88 414 L 981.88 417.5 L 983.63 414 L 981.88 410.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 413px; margin-left: 930px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">7</div></div></div></foreignObject><text x="930" y="416" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">7</text></switch></g><path d="M 990 394 L 876.37 394" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 871.12 394 L 878.12 390.5 L 876.37 394 L 878.12 397.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 394px; margin-left: 931px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">8</div></div></div></foreignObject><text x="931" y="397" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">8</text></switch></g><rect x="990" y="384" width="100" height="40" fill="#f5f5f5" stroke="#666666" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 404px; margin-left: 991px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #333333; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Worker</div></div></div></foreignObject><text x="1040" y="408" fill="#333333" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Worker</text></switch></g><rect x="893" y="426" width="96" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 435px; margin-left: 941px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Exec payload</div></div></div></foreignObject><text x="941" y="438" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec payload</text></switch></g><rect x="873" y="366" width="125" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 375px; margin-left: 936px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Reveive response</div></div></div></foreignObject><text x="936" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Reveive response</text></switch></g><rect x="401" y="255" width="108" height="34" fill="#f8cecc" stroke="#b85450" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 106px; height: 1px; padding-top: 272px; margin-left: 402px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">HTTP plugin</div></div></div></foreignObject><text x="455" y="276" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">HTTP plugin</text></switch></g><path d="M 450 157.5 L 460 157.5 L 460 235.5 L 470.5 235.5 L 455 254.5 L 439.5 235.5 L 450 235.5 Z" fill="none" stroke="#000000" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="all"/><rect x="490" y="364" width="40" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 38px; height: 1px; padding-top: 374px; margin-left: 491px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Pool</div></div></div></foreignObject><text x="510" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Pool</text></switch></g><rect x="770" y="364" width="100" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 374px; margin-left: 771px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Golang Worker</div></div></div></foreignObject><text x="820" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Golang Worker</text></switch></g><rect x="1006" y="364" width="84" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 82px; height: 1px; padding-top: 374px; margin-left: 1007px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">PHP worker</div></div></div></foreignObject><text x="1048" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">PHP worker</text></switch></g></g><switch><g requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"/><a transform="translate(0,-5)" xlink:href="https://www.diagrams.net/doc/faq/svg-export-text-problems" target="_blank"><text text-anchor="middle" font-size="10px" x="50%" y="100%">Viewer does not support full SVG 1.1</text></a></switch></svg>
\ No newline at end of file diff --git a/pkg/events/events.go b/pkg/events/general.go index 226a0c91..a09a8759 100755 --- a/pkg/events/events.go +++ b/pkg/events/general.go @@ -2,18 +2,16 @@ package events import ( "sync" - - "github.com/spiral/roadrunner/v2/interfaces/events" ) // HandlerImpl helps to broadcast events to multiple listeners. type HandlerImpl struct { - listeners []events.Listener + listeners []Listener sync.RWMutex // all receivers should be pointers } -func NewEventsHandler() events.Handler { - return &HandlerImpl{listeners: make([]events.Listener, 0, 2)} +func NewEventsHandler() Handler { + return &HandlerImpl{listeners: make([]Listener, 0, 2)} } // NumListeners returns number of event listeners. @@ -24,7 +22,7 @@ func (eb *HandlerImpl) NumListeners() int { } // AddListener registers new event listener. -func (eb *HandlerImpl) AddListener(listener events.Listener) { +func (eb *HandlerImpl) AddListener(listener Listener) { eb.Lock() defer eb.Unlock() eb.listeners = append(eb.listeners, listener) diff --git a/interfaces/events/handler.go b/pkg/events/interface.go index ac6c15a4..ac6c15a4 100644 --- a/interfaces/events/handler.go +++ b/pkg/events/interface.go diff --git a/interfaces/events/pool_events.go b/pkg/events/pool_events.go index 2cc76eee..3925df56 100644 --- a/interfaces/events/pool_events.go +++ b/pkg/events/pool_events.go @@ -3,7 +3,7 @@ package events // TODO event numbers const ( // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct P = iota + 7800 + EventWorkerConstruct P = iota + 10000 // EventWorkerDestruct thrown after worker destruction. EventWorkerDestruct diff --git a/interfaces/events/worker_events.go b/pkg/events/worker_events.go index 2bff1811..9d428f7d 100644 --- a/interfaces/events/worker_events.go +++ b/pkg/events/worker_events.go @@ -2,7 +2,7 @@ package events const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError W = iota + 200 + EventWorkerError W = iota + 11000 // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog diff --git a/pkg/pool/config.go b/pkg/pool/config.go index e3e2d3cd..782f7ce9 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -12,12 +12,12 @@ type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. - NumWorkers int64 `mapstructure:"num_workers"` + NumWorkers uint64 `mapstructure:"num_workers"` // MaxJobs defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. - MaxJobs int64 `mapstructure:"max_jobs"` + MaxJobs uint64 `mapstructure:"max_jobs"` // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. Defaults to 60s. @@ -34,7 +34,7 @@ type Config struct { // InitDefaults enables default config values. func (cfg *Config) InitDefaults() { if cfg.NumWorkers == 0 { - cfg.NumWorkers = int64(runtime.NumCPU()) + cfg.NumWorkers = uint64(runtime.NumCPU()) } if cfg.AllocateTimeout == 0 { @@ -52,16 +52,16 @@ func (cfg *Config) InitDefaults() { type SupervisorConfig struct { // WatchTick defines how often to check the state of worker. - WatchTick uint64 `mapstructure:"watch_tick"` + WatchTick time.Duration `mapstructure:"watch_tick"` // TTL defines maximum time worker is allowed to live. - TTL uint64 `mapstructure:"ttl"` + TTL time.Duration `mapstructure:"ttl"` // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL uint64 `mapstructure:"idle_ttl"` + IdleTTL time.Duration `mapstructure:"idle_ttl"` // ExecTTL defines maximum lifetime per job. - ExecTTL uint64 `mapstructure:"exec_ttl"` + ExecTTL time.Duration `mapstructure:"exec_ttl"` // MaxWorkerMemory limits memory per worker. MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"` @@ -70,6 +70,6 @@ type SupervisorConfig struct { // InitDefaults enables default config values. func (cfg *SupervisorConfig) InitDefaults() { if cfg.WatchTick == 0 { - cfg.WatchTick = 1 + cfg.WatchTick = time.Second } } diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go new file mode 100644 index 00000000..4f7ae595 --- /dev/null +++ b/pkg/pool/interface.go @@ -0,0 +1,29 @@ +package pool + +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +// Pool managed set of inner worker processes. +type Pool interface { + // GetConfig returns pool configuration. + GetConfig() interface{} + + // Exec executes task with payload + Exec(rqs payload.Payload) (payload.Payload, error) + + // ExecWithContext executes task with context which is used with timeout + ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) + + // Workers returns worker list associated with the pool. + Workers() (workers []worker.SyncWorker) + + // Remove worker from the pool. + RemoveWorker(worker worker.SyncWorker) error + + // Destroy all underlying stack (but let them to complete the task). + Destroy(ctx context.Context) +} diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 438f936f..44adf9c0 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -6,13 +6,11 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/transport" + "github.com/spiral/roadrunner/v2/pkg/worker" workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" ) @@ -20,7 +18,7 @@ import ( const StopRequest = "{\"stop\":true}" // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) +type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error) type Options func(p *StaticPool) @@ -34,7 +32,7 @@ type StaticPool struct { cmd Command // creates and connects to stack - factory worker.Factory + factory transport.Factory // distributes the events events events.Handler @@ -43,7 +41,7 @@ type StaticPool struct { listeners []events.Listener // manages worker states and TTLs - ww worker.Watcher + ww workerWatcher.Watcher // allocate new worker allocator worker.Allocator @@ -53,7 +51,7 @@ type StaticPool struct { } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) { +func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) { const op = errors.Op("static_pool_initialize") if factory == nil { return nil, errors.E(op, errors.Str("no factory initialized")) @@ -69,7 +67,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co cfg: cfg, cmd: cmd, factory: factory, - events: eventsPkg.NewEventsHandler(), + events: events.NewEventsHandler(), } // add pool options @@ -78,7 +76,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co } p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) - p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) workers, err := p.allocateWorkers(p.cfg.NumWorkers) if err != nil { @@ -124,16 +122,17 @@ func (sp *StaticPool) GetConfig() interface{} { } // Workers returns worker list associated with the pool. -func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { +func (sp *StaticPool) Workers() (workers []worker.SyncWorker) { return sp.ww.WorkersList() } -func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { +func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error { return sp.ww.RemoveWorker(wb) } +// Be careful, sync Exec with ExecWithContext func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("exec") + const op = errors.Op("static_pool_exec") if sp.cfg.Debug { return sp.execDebug(p) } @@ -152,28 +151,21 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { // worker want's to be terminated // TODO careful with string(rsp.Context) if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { - w.State().Set(internal.StateInvalid) - err = w.Stop() - if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) - } + sp.stopWorker(w) return sp.Exec(p) } - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew() - if err != nil { - return payload.Payload{}, errors.E(op, err) - } - } else { - sp.ww.PushWorker(w) + err = sp.checkMaxJobs(w) + if err != nil { + return payload.Payload{}, errors.E(op, err) } return rsp, nil } -func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { +// Be careful, sync with pool.Exec method +func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) defer cancel() @@ -182,32 +174,46 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) return payload.Payload{}, errors.E(op, err) } - rsp, err := w.ExecWithTimeout(ctx, rqs) + rsp, err := w.ExecWithTimeout(ctx, p) if err != nil { return sp.errEncoder(err, w) } // worker want's to be terminated - if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - w.State().Set(internal.StateInvalid) - err = w.Stop() - if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) - } + if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { + sp.stopWorker(w) + return sp.ExecWithContext(ctx, p) + } + + err = sp.checkMaxJobs(w) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } - return sp.ExecWithContext(ctx, rqs) + return rsp, nil +} + +func (sp *StaticPool) stopWorker(w worker.SyncWorker) { + const op = errors.Op("static_pool_stop_worker") + w.State().Set(internal.StateInvalid) + err := w.Stop() + if err != nil { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } +} +// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs +func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error { + const op = errors.Op("static_pool_check_max_jobs") if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew() + err := sp.ww.AllocateNew() if err != nil { - return payload.Payload{}, errors.E(op, err) + return errors.E(op, err) } } else { sp.ww.PushWorker(w) } - - return rsp, nil + return nil } func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { @@ -222,7 +228,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke // else if err not nil - return error return nil, errors.E(op, err) } - return w.(worker.SyncWorker), nil + return w, nil } // Destroy all underlying stack (but let them to complete the task). @@ -231,7 +237,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (payload.Payload, error) { + return func(err error, w worker.SyncWorker) (payload.Payload, error) { const op = errors.Op("error encoder") // just push event if on any stage was timeout error if errors.Is(errors.ExecTTL, err) { @@ -269,8 +275,8 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } } -func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator { - return func() (worker.BaseProcess, error) { +func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { + return func() (*worker.SyncWorkerImpl, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...) @@ -278,10 +284,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return nil, err } - sw, err := syncWorker.From(w) - if err != nil { - return nil, err - } + sw := worker.From(w) sp.events.Push(events.PoolEvent{ Event: events.EventWorkerConstruct, @@ -297,7 +300,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, err } - r, err := sw.(worker.SyncWorker).Exec(p) + r, err := sw.Exec(p) if stopErr := sw.Stop(); stopErr != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) @@ -307,22 +310,18 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) { +func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) { const op = errors.Op("allocate workers") - var workers []worker.BaseProcess + var workers []worker.SyncWorker // constant number of stack simplify logic - for i := int64(0); i < numWorkers; i++ { + for i := uint64(0); i < numWorkers; i++ { w, err := sp.allocator() if err != nil { return nil, errors.E(op, errors.WorkerAllocate, err) } - sw, err := syncWorker.From(w) - if err != nil { - return nil, errors.E(op, err) - } - workers = append(workers, sw) + workers = append(workers, w) } return workers, nil } diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index f66895dc..a32790e0 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -12,15 +12,15 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/pipe" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" "github.com/stretchr/testify/assert" ) var cfg = Config{ - NumWorkers: int64(runtime.NumCPU()), + NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, } @@ -489,6 +489,84 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { p.Destroy(context.Background()) } +func Test_StaticPool_NoFreeWorkers(t *testing.T) { + ctx := context.Background() + block := make(chan struct{}, 1) + + listener := func(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventNoFreeWorkers { + block <- struct{}{} + } + } + } + + p, err := Initialize( + ctx, + // sleep for the 3 seconds + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + Config{ + Debug: false, + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: nil, + }, + AddListeners(listener), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + go func() { + _, _ = p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + }() + + time.Sleep(time.Second) + res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + + <-block + + p.Destroy(ctx) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_WrongCommand1(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.Error(t, err) + assert.Nil(t, p) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_WrongCommand2(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.Error(t, err) + assert.Nil(t, p) +} + func Benchmark_Pool_Echo(b *testing.B) { ctx := context.Background() p, err := Initialize( @@ -518,7 +596,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), Config{ - NumWorkers: int64(runtime.NumCPU()), + NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, }, diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 19cda759..2597b352 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -6,11 +6,10 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/tools" ) @@ -20,7 +19,7 @@ const MB = 1024 * 1024 const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck type Supervised interface { - pool.Pool + Pool // Start used to start watching process for all pool workers Start() } @@ -28,12 +27,12 @@ type Supervised interface { type supervised struct { cfg *SupervisorConfig events events.Handler - pool pool.Pool + pool Pool stopCh chan struct{} mu *sync.RWMutex } -func supervisorWrapper(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised { +func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { sp := &supervised{ cfg: cfg, events: events, @@ -57,7 +56,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) } c := make(chan ttlExec, 1) - ctx, cancel := context.WithTimeout(ctx, time.Duration(sp.cfg.ExecTTL)*time.Second) + ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) defer cancel() go func() { res, err := sp.pool.ExecWithContext(ctx, rqs) @@ -101,13 +100,13 @@ func (sp *supervised) GetConfig() interface{} { return sp.pool.GetConfig() } -func (sp *supervised) Workers() (workers []worker.BaseProcess) { +func (sp *supervised) Workers() (workers []worker.SyncWorker) { sp.mu.Lock() defer sp.mu.Unlock() return sp.pool.Workers() } -func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { +func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error { return sp.pool.RemoveWorker(worker) } @@ -117,7 +116,7 @@ func (sp *supervised) Destroy(ctx context.Context) { func (sp *supervised) Start() { go func() { - watchTout := time.NewTicker(time.Duration(sp.cfg.WatchTick) * time.Second) + watchTout := time.NewTicker(sp.cfg.WatchTick) for { select { case <-sp.stopCh: @@ -155,7 +154,7 @@ func (sp *supervised) control() { continue } - if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) @@ -210,7 +209,7 @@ func (sp *supervised) control() { // IdleTTL is 1 second. // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. - if int64(sp.cfg.IdleTTL)-res <= 0 { + if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index b3358965..c67d5d91 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -6,21 +6,22 @@ import ( "testing" "time" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/pipe" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" "github.com/spiral/roadrunner/v2/tools" "github.com/stretchr/testify/assert" ) var cfgSupervised = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 100, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 100 * time.Second, MaxWorkerMemory: 100, }, } @@ -73,14 +74,14 @@ func TestSupervisedPool_Exec(t *testing.T) { func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 1, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 1 * time.Second, MaxWorkerMemory: 100, }, } @@ -114,14 +115,14 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { func TestSupervisedPool_Idle(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 1, - ExecTTL: 100, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 1 * time.Second, + ExecTTL: 100 * time.Second, MaxWorkerMemory: 100, }, } @@ -155,14 +156,14 @@ func TestSupervisedPool_Idle(t *testing.T) { func TestSupervisedPool_ExecTTL_OK(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 4, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, MaxWorkerMemory: 100, }, } @@ -194,3 +195,54 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { // should be the same pid assert.Equal(t, pid, p.Workers()[0].Pid()) } + +func TestSupervisedPool_MaxMemoryReached(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, + MaxWorkerMemory: 1, + }, + } + + block := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventMaxMemory { + block <- struct{}{} + } + } + } + + // constructed + // max memory + // constructed + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + AddListeners(listener), + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + resp, err := p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + <-block + p.Destroy(context.Background()) +} diff --git a/interfaces/worker/factory.go b/pkg/transport/interface.go index 376303df..299ac95f 100644 --- a/interfaces/worker/factory.go +++ b/pkg/transport/interface.go @@ -1,20 +1,21 @@ -package worker +package transport import ( "context" "os/exec" - "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" ) // Factory is responsible of wrapping given command into tasks WorkerProcess. type Factory interface { // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. // Process must not be started. - SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (BaseProcess, error) + SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. - SpawnWorker(*exec.Cmd, ...events.Listener) (BaseProcess, error) + SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error) // Close the factory and underlying connections. Close() error } diff --git a/pkg/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go index b656eff8..dd7c5841 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/transport/pipe/pipe_factory.go @@ -6,10 +6,9 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" ) @@ -19,22 +18,22 @@ type Factory struct{} // NewPipeFactory returns new factory instance and starts // listening -func NewPipeFactory() worker.Factory { +func NewPipeFactory() *Factory { return &Factory{} } type SpawnResult struct { - w worker.BaseProcess + w *worker.Process err error } // SpawnWorker creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { c := make(chan SpawnResult) const op = errors.Op("factory_spawn_worker_with_timeout") go func() { - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { c <- SpawnResult{ w: nil, @@ -113,9 +112,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { const op = errors.Op("factory_spawn_worker") - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { return nil, errors.E(op, err) } diff --git a/pkg/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index 805a24ee..d4949c82 100644 --- a/pkg/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -8,15 +8,15 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) func Test_GetState2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -28,14 +28,11 @@ func Test_GetState2(t *testing.T) { assert.NotNil(t, w) assert.Equal(t, internal.StateReady, w.State().Value()) - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } + assert.NoError(t, w.Stop()) } func Test_Kill2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) wg := &sync.WaitGroup{} @@ -58,7 +55,7 @@ func Test_Kill2(t *testing.T) { } func Test_Pipe_Start2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) assert.NoError(t, err) @@ -72,7 +69,7 @@ func Test_Pipe_Start2(t *testing.T) { } func Test_Pipe_StartError2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") err := cmd.Start() if err != nil { t.Errorf("error running the command: error %v", err) @@ -84,7 +81,7 @@ func Test_Pipe_StartError2(t *testing.T) { } func Test_Pipe_PipeError3(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) @@ -96,7 +93,7 @@ func Test_Pipe_PipeError3(t *testing.T) { } func Test_Pipe_PipeError4(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) @@ -108,7 +105,7 @@ func Test_Pipe_PipeError4(t *testing.T) { } func Test_Pipe_Failboot2(t *testing.T) { - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") w, err := NewPipeFactory().SpawnWorker(cmd) assert.Nil(t, w) @@ -117,14 +114,14 @@ func Test_Pipe_Failboot2(t *testing.T) { } func Test_Pipe_Invalid2(t *testing.T) { - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") w, err := NewPipeFactory().SpawnWorker(cmd) assert.Error(t, err) assert.Nil(t, w) } func Test_Pipe_Echo2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { t.Fatal(err) @@ -136,10 +133,7 @@ func Test_Pipe_Echo2(t *testing.T) { } }() - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -152,7 +146,7 @@ func Test_Pipe_Echo2(t *testing.T) { } func Test_Pipe_Broken2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { t.Fatal(err) @@ -163,10 +157,7 @@ func Test_Pipe_Broken2(t *testing.T) { assert.Error(t, err) }() - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -178,7 +169,7 @@ func Test_Pipe_Broken2(t *testing.T) { func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { f := NewPipeFactory() for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := f.SpawnWorker(cmd) go func() { if w.Wait() != nil { @@ -194,13 +185,11 @@ func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) + b.ReportAllocs() b.ResetTimer() go func() { @@ -224,7 +213,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { b.Fatal(err) @@ -237,10 +226,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { } }() - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -250,7 +236,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { b.Fatal(err) @@ -263,10 +249,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { } }() - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -276,28 +259,26 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { } func Test_Echo2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { t.Fatal(err) } - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) + go func() { - assert.NoError(t, syncWorker.Wait()) + assert.NoError(t, sw.Wait()) }() defer func() { - err := syncWorker.Stop() + err := sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } }() - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -308,26 +289,23 @@ func Test_Echo2(t *testing.T) { } func Test_BadPayload2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) go func() { - assert.NoError(t, syncWorker.Wait()) + assert.NoError(t, sw.Wait()) }() defer func() { - err := syncWorker.Stop() + err := sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } }() - res, err := syncWorker.Exec(payload.Payload{}) + res, err := sw.Exec(payload.Payload{}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -337,7 +315,7 @@ func Test_BadPayload2(t *testing.T) { } func Test_String2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -350,13 +328,13 @@ func Test_String2(t *testing.T) { } }() - assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") assert.Contains(t, w.String(), "ready") assert.Contains(t, w.String(), "numExecs: 0") } func Test_Echo_Slow2(t *testing.T) { - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -369,12 +347,9 @@ func Test_Echo_Slow2(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -385,7 +360,7 @@ func Test_Echo_Slow2(t *testing.T) { } func Test_Broken2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") data := "" mu := &sync.Mutex{} listener := func(event interface{}) { @@ -401,12 +376,9 @@ func Test_Broken2(t *testing.T) { t.Fatal(err) } - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -421,7 +393,7 @@ func Test_Broken2(t *testing.T) { } func Test_Error2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -435,12 +407,9 @@ func Test_Error2(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -452,7 +421,7 @@ func Test_Error2(t *testing.T) { } func Test_NumExecs2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { @@ -465,26 +434,23 @@ func Test_NumExecs2(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(1), w.State().NumExecs()) + assert.Equal(t, uint64(1), w.State().NumExecs()) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(2), w.State().NumExecs()) + assert.Equal(t, uint64(2), w.State().NumExecs()) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(3), w.State().NumExecs()) + assert.Equal(t, uint64(3), w.State().NumExecs()) } diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index a2731294..38166b85 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -9,16 +9,16 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) func Test_GetState(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -38,7 +38,7 @@ func Test_GetState(t *testing.T) { func Test_Kill(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) wg := &sync.WaitGroup{} @@ -62,7 +62,7 @@ func Test_Kill(t *testing.T) { func Test_Pipe_Start(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) @@ -76,7 +76,7 @@ func Test_Pipe_Start(t *testing.T) { } func Test_Pipe_StartError(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") err := cmd.Start() if err != nil { t.Errorf("error running the command: error %v", err) @@ -89,7 +89,7 @@ func Test_Pipe_StartError(t *testing.T) { } func Test_Pipe_PipeError(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) @@ -102,7 +102,7 @@ func Test_Pipe_PipeError(t *testing.T) { } func Test_Pipe_PipeError2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) @@ -115,7 +115,7 @@ func Test_Pipe_PipeError2(t *testing.T) { } func Test_Pipe_Failboot(t *testing.T) { - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) @@ -125,7 +125,7 @@ func Test_Pipe_Failboot(t *testing.T) { } func Test_Pipe_Invalid(t *testing.T) { - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) @@ -133,7 +133,7 @@ func Test_Pipe_Invalid(t *testing.T) { } func Test_Pipe_Echo(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -146,10 +146,7 @@ func Test_Pipe_Echo(t *testing.T) { } }() - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -162,7 +159,7 @@ func Test_Pipe_Echo(t *testing.T) { } func Test_Pipe_Broken(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -174,10 +171,7 @@ func Test_Pipe_Broken(t *testing.T) { assert.Error(t, err) }() - sw, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -189,7 +183,7 @@ func Test_Pipe_Broken(t *testing.T) { func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { f := NewPipeFactory() for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) go func() { if w.Wait() != nil { @@ -205,13 +199,11 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) + b.ReportAllocs() b.ResetTimer() go func() { @@ -235,7 +227,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -249,10 +241,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } }() - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -262,7 +251,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") ctx := context.Background() w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -276,10 +265,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { } }() - sw, err := workerImpl.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -290,28 +276,25 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { func Test_Echo(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) go func() { - assert.NoError(t, syncWorker.Wait()) + assert.NoError(t, sw.Wait()) }() defer func() { - err := syncWorker.Stop() + err := sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } }() - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -323,26 +306,23 @@ func Test_Echo(t *testing.T) { func Test_BadPayload(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) go func() { - assert.NoError(t, syncWorker.Wait()) + assert.NoError(t, sw.Wait()) }() defer func() { - err := syncWorker.Stop() + err := sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } }() - res, err := syncWorker.Exec(payload.Payload{}) + res, err := sw.Exec(payload.Payload{}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -353,7 +333,7 @@ func Test_BadPayload(t *testing.T) { func Test_String(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -366,14 +346,14 @@ func Test_String(t *testing.T) { } }() - assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") assert.Contains(t, w.String(), "ready") assert.Contains(t, w.String(), "numExecs: 0") } func Test_Echo_Slow(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -386,12 +366,9 @@ func Test_Echo_Slow(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -403,7 +380,7 @@ func Test_Echo_Slow(t *testing.T) { func Test_Broken(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") data := "" mu := &sync.Mutex{} listener := func(event interface{}) { @@ -419,12 +396,9 @@ func Test_Broken(t *testing.T) { t.Fatal(err) } - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -440,7 +414,7 @@ func Test_Broken(t *testing.T) { func Test_Error(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -454,12 +428,9 @@ func Test_Error(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -472,7 +443,7 @@ func Test_Error(t *testing.T) { func Test_NumExecs(t *testing.T) { ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -485,26 +456,23 @@ func Test_NumExecs(t *testing.T) { } }() - syncWorker, err := workerImpl.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(1), w.State().NumExecs()) + assert.Equal(t, uint64(1), w.State().NumExecs()) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(2), w.State().NumExecs()) + assert.Equal(t, uint64(2), w.State().NumExecs()) - _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(3), w.State().NumExecs()) + assert.Equal(t, uint64(3), w.State().NumExecs()) } diff --git a/pkg/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index 8f99ff73..ccd2b0bf 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -11,10 +11,9 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/socket" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" "golang.org/x/sync/errgroup" @@ -38,7 +37,7 @@ type Factory struct { // NewSocketServer returns Factory attached to a given socket listener. // tout specifies for how long factory should serve for incoming relay connection -func NewSocketServer(ls net.Listener, tout time.Duration) worker.Factory { +func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { f := &Factory{ ls: ls, tout: tout, @@ -80,18 +79,18 @@ func (f *Factory) listen() error { } type socketSpawn struct { - w worker.BaseProcess + w *worker.Process err error } // SpawnWorker creates Process and connects it to appropriate relay or returns error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { const op = errors.Op("factory_spawn_worker_with_timeout") c := make(chan socketSpawn) go func() { ctx, cancel := context.WithTimeout(ctx, f.tout) defer cancel() - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { c <- socketSpawn{ w: nil, @@ -145,9 +144,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { const op = errors.Op("factory_spawn_worker") - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { return nil, err } diff --git a/pkg/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go index 2f21e408..0e29e7d2 100644 --- a/pkg/socket/socket_factory_spawn_test.go +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -26,7 +26,7 @@ func Test_Tcp_Start2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) assert.NoError(t, err) @@ -49,7 +49,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") f := NewSocketServer(ls, time.Minute) defer func() { @@ -82,7 +82,7 @@ func Test_Tcp_StartError2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") err = cmd.Start() if err != nil { t.Errorf("error executing the command: error %v", err) @@ -106,7 +106,7 @@ func Test_Tcp_Failboot2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) @@ -127,7 +127,7 @@ func Test_Tcp_Invalid2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd) assert.Error(t, err) @@ -147,7 +147,7 @@ func Test_Tcp_Broken2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { @@ -169,10 +169,7 @@ func Test_Tcp_Broken2(t *testing.T) { assert.Error(t, err2) }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) @@ -194,7 +191,7 @@ func Test_Tcp_Echo2(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) go func() { @@ -207,10 +204,7 @@ func Test_Tcp_Echo2(t *testing.T) { } }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -230,7 +224,7 @@ func Test_Unix_Start2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) assert.NoError(t, err) @@ -254,7 +248,7 @@ func Test_Unix_Failboot2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) @@ -270,7 +264,7 @@ func Test_Unix_Timeout2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd) assert.Nil(t, w) @@ -286,7 +280,7 @@ func Test_Unix_Invalid2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd) assert.Error(t, err) @@ -301,7 +295,7 @@ func Test_Unix_Broken2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { @@ -322,10 +316,7 @@ func Test_Unix_Broken2(t *testing.T) { assert.Error(t, err) }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -343,7 +334,7 @@ func Test_Unix_Echo2(t *testing.T) { assert.NoError(t, err) }() - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { @@ -359,10 +350,7 @@ func Test_Unix_Echo2(t *testing.T) { } }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -384,7 +372,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := f.SpawnWorker(cmd) if err != nil { @@ -409,7 +397,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { assert.NoError(b, err) }() - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { @@ -422,10 +410,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { } }() - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -452,7 +437,7 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := f.SpawnWorker(cmd) if err != nil { @@ -481,7 +466,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { b.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { @@ -494,10 +479,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { } }() - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { diff --git a/pkg/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index 983f3e8e..f55fc3dd 100755 --- a/pkg/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -29,7 +29,7 @@ func Test_Tcp_Start(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) @@ -54,7 +54,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") f := NewSocketServer(ls, time.Minute) defer func() { @@ -89,7 +89,7 @@ func Test_Tcp_StartError(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") err = cmd.Start() if err != nil { t.Errorf("error executing the command: error %v", err) @@ -116,7 +116,7 @@ func Test_Tcp_Failboot(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) @@ -139,7 +139,7 @@ func Test_Tcp_Timeout(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0") + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "tcp", "200", "0") w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) @@ -162,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) @@ -184,7 +184,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -206,10 +206,7 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, err2) }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) @@ -233,7 +230,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) go func() { @@ -246,10 +243,7 @@ func Test_Tcp_Echo(t *testing.T) { } }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -275,7 +269,7 @@ func Test_Unix_Start(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) @@ -305,7 +299,7 @@ func Test_Unix_Failboot(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/failboot.php") + cmd := exec.Command("php", "../../../tests/failboot.php") w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) @@ -327,7 +321,7 @@ func Test_Unix_Timeout(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) @@ -349,7 +343,7 @@ func Test_Unix_Invalid(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/invalid.php") + cmd := exec.Command("php", "../../../tests/invalid.php") w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) @@ -370,7 +364,7 @@ func Test_Unix_Broken(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -391,10 +385,7 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, err) }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -418,7 +409,7 @@ func Test_Unix_Echo(t *testing.T) { t.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -434,10 +425,7 @@ func Test_Unix_Echo(t *testing.T) { } }() - sw, err := worker.From(w) - if err != nil { - t.Fatal(err) - } + sw := worker.From(w) res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) @@ -465,7 +453,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := f.SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -496,7 +484,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { b.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -509,10 +497,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { } }() - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { @@ -537,7 +522,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := f.SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -564,7 +549,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { b.Skip("socket is busy") } - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { @@ -577,10 +562,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { } }() - sw, err := worker.From(w) - if err != nil { - b.Fatal(err) - } + sw := worker.From(w) for n := 0; n < b.N; n++ { if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { diff --git a/interfaces/worker/worker.go b/pkg/worker/interface.go index 0ac82158..9d74ae10 100644 --- a/interfaces/worker/worker.go +++ b/pkg/worker/interface.go @@ -10,9 +10,6 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" ) -// Allocator is responsible for worker allocation in the pool -type Allocator func() (BaseProcess, error) - type BaseProcess interface { fmt.Stringer diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 8314c039..1a0393fb 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -8,50 +8,67 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" ) -type syncWorker struct { - w worker.BaseProcess +// Allocator is responsible for worker allocation in the pool +type Allocator func() (*SyncWorkerImpl, error) + +type SyncWorkerImpl struct { + process *Process } // From creates SyncWorker from BaseProcess -func From(w worker.BaseProcess) (worker.SyncWorker, error) { - return &syncWorker{ - w: w, - }, nil +func From(process *Process) *SyncWorkerImpl { + return &SyncWorkerImpl{ + process: process, + } +} + +// FromSync creates BaseProcess from SyncWorkerImpl +func FromSync(w *SyncWorkerImpl) BaseProcess { + return &Process{ + created: w.process.created, + events: w.process.events, + state: w.process.state, + cmd: w.process.cmd, + pid: w.process.pid, + stderr: w.process.stderr, + endState: w.process.endState, + relay: w.process.relay, + rd: w.process.rd, + } } // Exec payload without TTL timeout. -func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec") if len(p.Body) == 0 && len(p.Context) == 0 { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.w.State().Value() != internal.StateReady { - return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) + if tw.process.State().Value() != internal.StateReady { + return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(internal.StateWorking) + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.w.State().Set(internal.StateErrored) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.w.State().Set(internal.StateReady) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() return rsp, nil } @@ -62,7 +79,7 @@ type wexec struct { } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) @@ -75,24 +92,24 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p return } - if tw.w.State().Value() != internal.StateReady { + if tw.process.State().Value() != internal.StateReady { c <- wexec{ payload: payload.Payload{}, - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())), + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), } return } // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(internal.StateWorking) + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.w.State().Set(internal.StateErrored) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() } c <- wexec{ payload: payload.Payload{}, @@ -101,8 +118,8 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p return } - tw.w.State().Set(internal.StateReady) - tw.w.State().RegisterExec() + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() c <- wexec{ payload: rsp, @@ -128,7 +145,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p } } -func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_payload") fr := frame.NewFrame() @@ -156,7 +173,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { frameR := frame.NewFrame() - err = tw.w.Relay().Receive(frameR) + err = tw.process.Relay().Receive(frameR) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -186,42 +203,42 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { return pl, nil } -func (tw *syncWorker) String() string { - return tw.w.String() +func (tw *SyncWorkerImpl) String() string { + return tw.process.String() } -func (tw *syncWorker) Pid() int64 { - return tw.w.Pid() +func (tw *SyncWorkerImpl) Pid() int64 { + return tw.process.Pid() } -func (tw *syncWorker) Created() time.Time { - return tw.w.Created() +func (tw *SyncWorkerImpl) Created() time.Time { + return tw.process.Created() } -func (tw *syncWorker) State() internal.State { - return tw.w.State() +func (tw *SyncWorkerImpl) State() internal.State { + return tw.process.State() } -func (tw *syncWorker) Start() error { - return tw.w.Start() +func (tw *SyncWorkerImpl) Start() error { + return tw.process.Start() } -func (tw *syncWorker) Wait() error { - return tw.w.Wait() +func (tw *SyncWorkerImpl) Wait() error { + return tw.process.Wait() } -func (tw *syncWorker) Stop() error { - return tw.w.Stop() +func (tw *SyncWorkerImpl) Stop() error { + return tw.process.Stop() } -func (tw *syncWorker) Kill() error { - return tw.w.Kill() +func (tw *SyncWorkerImpl) Kill() error { + return tw.process.Kill() } -func (tw *syncWorker) Relay() relay.Relay { - return tw.w.Relay() +func (tw *SyncWorkerImpl) Relay() relay.Relay { + return tw.process.Relay() } -func (tw *syncWorker) AttachRelay(rl relay.Relay) { - tw.w.AttachRelay(rl) +func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { + tw.process.AttachRelay(rl) } diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go index 40988b06..df556e93 100755 --- a/pkg/worker/sync_worker_test.go +++ b/pkg/worker/sync_worker_test.go @@ -22,12 +22,9 @@ func Test_NotStarted_Exec(t *testing.T) { w, _ := InitBaseWorker(cmd) - syncWorker, err := From(w) - if err != nil { - t.Fatal(err) - } + sw := From(w) - res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index bf70d646..8fd71cca 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -13,10 +13,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/events" "go.uber.org/multierr" ) @@ -78,14 +76,14 @@ type Process struct { } // InitBaseWorker creates new Process over given exec.cmd. -func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, error) { +func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { const op = errors.Op("init_base_worker") if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } w := &Process{ created: time.Now(), - events: eventsPkg.NewEventsHandler(), + events: events.NewEventsHandler(), cmd: cmd, state: internal.NewWorkerState(internal.StateInactive), stderr: new(bytes.Buffer), @@ -198,7 +196,7 @@ func (w *Process) Wait() error { // at this point according to the documentation (see cmd.Wait comment) // if worker finishes with an error, message will be written to the stderr first - // and then w.cmd.Wait return an error + // and then process.cmd.Wait return an error w.endState = w.cmd.ProcessState if err != nil { w.state.Set(internal.StateErrored) diff --git a/interfaces/worker/watcher.go b/pkg/worker_watcher/interface.go index ce2c1c5a..927aa270 100644 --- a/interfaces/worker/watcher.go +++ b/pkg/worker_watcher/interface.go @@ -1,16 +1,20 @@ -package worker +package worker_watcher //nolint:golint,stylecheck -import "context" +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) type Watcher interface { // AddToWatch used to add stack to wait its state - AddToWatch(workers []BaseProcess) error + AddToWatch(workers []worker.SyncWorker) error // GetFreeWorker provide first free worker - GetFreeWorker(ctx context.Context) (BaseProcess, error) + GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) // PutWorker enqueues worker back - PushWorker(w BaseProcess) + PushWorker(w worker.SyncWorker) // AllocateNew used to allocate new worker and put in into the WorkerWatcher AllocateNew() error @@ -19,8 +23,8 @@ type Watcher interface { Destroy(ctx context.Context) // WorkersList return all stack w/o removing it from internal storage - WorkersList() []BaseProcess + WorkersList() []worker.SyncWorker // RemoveWorker remove worker from the stack - RemoveWorker(wb BaseProcess) error + RemoveWorker(wb worker.SyncWorker) error } diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index c87e8b65..d76f4d8f 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -5,12 +5,12 @@ import ( "sync" "time" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" ) type Stack struct { - workers []worker.BaseProcess + workers []*worker.SyncWorkerImpl mutex sync.RWMutex destroy bool actualNumOfWorkers uint64 @@ -20,7 +20,7 @@ type Stack struct { func NewWorkersStack(initialNumOfWorkers uint64) *Stack { w := runtime.NumCPU() return &Stack{ - workers: make([]worker.BaseProcess, 0, w), + workers: make([]*worker.SyncWorkerImpl, 0, w), actualNumOfWorkers: 0, initialNumOfWorkers: initialNumOfWorkers, } @@ -39,7 +39,7 @@ func (stack *Stack) Push(w worker.BaseProcess) { stack.mutex.Lock() defer stack.mutex.Unlock() stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w) + stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl)) } func (stack *Stack) IsEmpty() bool { @@ -48,7 +48,7 @@ func (stack *Stack) IsEmpty() bool { return len(stack.workers) == 0 } -func (stack *Stack) Pop() (worker.BaseProcess, bool) { +func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { stack.mutex.Lock() defer stack.mutex.Unlock() @@ -85,13 +85,15 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { } // Workers return copy of the workers in the stack -func (stack *Stack) Workers() []worker.BaseProcess { +func (stack *Stack) Workers() []worker.SyncWorker { stack.mutex.Lock() defer stack.mutex.Unlock() - workersCopy := make([]worker.BaseProcess, 0, 1) + workersCopy := make([]worker.SyncWorker, 0, 1) // copy for _, v := range stack.workers { - workersCopy = append(workersCopy, v) + if v != nil { + workersCopy = append(workersCopy, v) + } } return workersCopy diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go index 86af2043..5287a6dc 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/stack_test.go @@ -5,24 +5,25 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/interfaces/worker" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) func TestNewWorkersStack(t *testing.T) { stack := NewWorkersStack(0) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) - assert.Equal(t, []worker.BaseProcess{}, stack.workers) + assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers) } func TestStack_Push(t *testing.T) { stack := NewWorkersStack(1) - w, err := workerImpl.InitBaseWorker(&exec.Cmd{}) + w, err := worker.InitBaseWorker(&exec.Cmd{}) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) } @@ -30,10 +31,12 @@ func TestStack_Pop(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) _, _ = stack.Pop() @@ -43,12 +46,14 @@ func TestStack_Pop(t *testing.T) { func TestStack_FindAndRemoveByPid(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) stack.FindAndRemoveByPid(w.Pid()) @@ -59,10 +64,12 @@ func TestStack_IsEmpty(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) assert.Equal(t, false, stack.IsEmpty()) @@ -71,11 +78,12 @@ func TestStack_IsEmpty(t *testing.T) { func TestStack_Workers(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) wrks := stack.Workers() assert.Equal(t, 1, len(wrks)) @@ -85,11 +93,13 @@ func TestStack_Workers(t *testing.T) { func TestStack_Reset(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) stack.Reset() assert.Equal(t, uint64(0), stack.actualNumOfWorkers) @@ -98,11 +108,13 @@ func TestStack_Reset(t *testing.T) { func TestStack_Destroy(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + stack.Destroy(context.Background()) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) } @@ -110,12 +122,13 @@ func TestStack_Destroy(t *testing.T) { func TestStack_DestroyWithWait(t *testing.T) { stack := NewWorkersStack(2) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + stack.Push(sw) assert.Equal(t, uint64(2), stack.actualNumOfWorkers) go func() { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index b0d39165..753b61ee 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -5,15 +5,15 @@ import ( "sync" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" ) // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ - stack: NewWorkersStack(uint64(numWorkers)), + stack: NewWorkersStack(numWorkers), allocator: allocator, events: events, } @@ -28,18 +28,18 @@ type workerWatcher struct { events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { +func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { for i := 0; i < len(workers); i++ { ww.stack.Push(workers[i]) - go func(swc worker.BaseProcess) { + go func(swc worker.SyncWorker) { ww.wait(swc) }(workers[i]) } return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) { +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation w, stop := ww.stack.Pop() @@ -94,7 +94,7 @@ func (ww *workerWatcher) AllocateNew() error { return nil } -func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { +func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { ww.mutex.Lock() defer ww.mutex.Unlock() @@ -114,7 +114,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { } // O(1) operation -func (ww *workerWatcher) PushWorker(w worker.BaseProcess) { +func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { ww.mutex.Lock() defer ww.mutex.Unlock() ww.stack.Push(w) @@ -127,7 +127,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []worker.BaseProcess { +func (ww *workerWatcher) WorkersList() []worker.SyncWorker { return ww.stack.Workers() } @@ -158,7 +158,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { } } -func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { +func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { go func() { ww.wait(wb) }() diff --git a/plugins/checker/plugin.go b/plugins/checker/plugin.go index 407cb2fa..59a37613 100644 --- a/plugins/checker/plugin.go +++ b/plugins/checker/plugin.go @@ -7,7 +7,7 @@ import ( "github.com/gofiber/fiber/v2" fiberLogger "github.com/gofiber/fiber/v2/middleware/logger" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" diff --git a/plugins/http/config/http.go b/plugins/http/config/http.go index bd689918..bfbc1af6 100644 --- a/plugins/http/config/http.go +++ b/plugins/http/config/http.go @@ -72,7 +72,7 @@ func (c *HTTP) InitDefaults() error { // default pool c.Pool = &poolImpl.Config{ Debug: false, - NumWorkers: int64(runtime.NumCPU()), + NumWorkers: uint64(runtime.NumCPU()), MaxJobs: 1000, AllocateTimeout: time.Second * 60, DestroyTimeout: time.Second * 60, @@ -146,26 +146,6 @@ func ParseCIDRs(subnets []string) (Cidrs, error) { return c, nil } -// IsTrusted if api can be trusted to use X-Real-Ip, X-Forwarded-For -func (c *HTTP) IsTrusted(ip string) bool { - if c.Cidrs == nil { - return false - } - - i := net.ParseIP(ip) - if i == nil { - return false - } - - for _, cird := range c.Cidrs { - if cird.Contains(i) { - return true - } - } - - return false -} - // Valid validates the configuration. func (c *HTTP) Valid() error { const op = errors.Op("validation") diff --git a/plugins/http/config/ssl.go b/plugins/http/config/ssl.go index c33dbce4..eb2b72b5 100644 --- a/plugins/http/config/ssl.go +++ b/plugins/http/config/ssl.go @@ -23,7 +23,7 @@ type SSL struct { Cert string // Root CA file - RootCA string + RootCA string `mapstructure:"root_ca"` // internal host string diff --git a/plugins/http/handler.go b/plugins/http/handler.go index ecdcb2c0..0e7481b5 100644 --- a/plugins/http/handler.go +++ b/plugins/http/handler.go @@ -10,8 +10,8 @@ import ( "github.com/hashicorp/go-multierror" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 249d2e57..4d64ac6d 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -13,11 +13,10 @@ import ( "sync" "github.com/hashicorp/go-multierror" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/checker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -104,7 +103,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se s.cfg.Env[RR_HTTP] = "true" - s.pool, err = server.NewWorkerPool(context.Background(), poolImpl.Config{ + s.pool, err = server.NewWorkerPool(context.Background(), pool.Config{ Debug: s.cfg.Pool.Debug, NumWorkers: s.cfg.Pool.NumWorkers, MaxJobs: s.cfg.Pool.MaxJobs, @@ -304,7 +303,12 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Workers returns associated pool workers func (s *Plugin) Workers() []worker.BaseProcess { - return s.pool.Workers() + workers := s.pool.Workers() + baseWorkers := make([]worker.BaseProcess, 0, len(workers)) + for i := 0; i < len(workers); i++ { + baseWorkers = append(baseWorkers, worker.FromSync(workers[i].(*worker.SyncWorkerImpl))) + } + return baseWorkers } // Name returns endure.Named interface implementation @@ -322,7 +326,7 @@ func (s *Plugin) Reset() error { s.pool = nil var err error - s.pool, err = s.server.NewWorkerPool(context.Background(), poolImpl.Config{ + s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{ Debug: s.cfg.Pool.Debug, NumWorkers: s.cfg.Pool.NumWorkers, MaxJobs: s.cfg.Pool.MaxJobs, diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go index 27139ae1..8e3b922b 100644 --- a/plugins/informer/interface.go +++ b/plugins/informer/interface.go @@ -1,6 +1,6 @@ package informer -import "github.com/spiral/roadrunner/v2/interfaces/worker" +import "github.com/spiral/roadrunner/v2/pkg/worker" // Informer used to get workers from particular plugin or set of plugins type Informer interface { diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go index 3391ee01..416c0112 100644 --- a/plugins/informer/plugin.go +++ b/plugins/informer/plugin.go @@ -1,9 +1,9 @@ package informer import ( - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go index 98b5681c..c036ae96 100644 --- a/plugins/informer/rpc.go +++ b/plugins/informer/rpc.go @@ -1,7 +1,7 @@ package informer import ( - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/tools" ) diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/boltdb/plugin_unit_test.go index 890736f7..ad3843e7 100644 --- a/plugins/kv/boltdb/plugin_unit_test.go +++ b/plugins/kv/boltdb/plugin_unit_test.go @@ -521,7 +521,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { assert.True(t, keyTTL < 5) assert.True(t, key2TTL < 5) - time.Sleep(time.Second * 4) + time.Sleep(time.Second * 7) // ensure that storage is clean v, err = s.Has("key", "key2") diff --git a/plugins/logger/plugin.go b/plugins/logger/plugin.go index 719770c0..141ede64 100644 --- a/plugins/logger/plugin.go +++ b/plugins/logger/plugin.go @@ -1,7 +1,7 @@ package logger import ( - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "go.uber.org/zap" diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go index 859f3d24..fefe92bd 100644 --- a/plugins/metrics/plugin.go +++ b/plugins/metrics/plugin.go @@ -9,7 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" diff --git a/plugins/metrics/rpc.go b/plugins/metrics/rpc.go index d7c90d39..538cdb78 100644 --- a/plugins/metrics/rpc.go +++ b/plugins/metrics/rpc.go @@ -26,7 +26,7 @@ type Metric struct { // Add new metric to the designated collector. func (rpc *rpcServer) Add(m *Metric, ok *bool) error { const op = errors.Op("metrics_plugin_add") - rpc.log.Info("Adding metric", "name", m.Name, "value", m.Value, "labels", m.Labels) + rpc.log.Info("adding metric", "name", m.Name, "value", m.Value, "labels", m.Labels) c, exist := rpc.svc.collectors.Load(m.Name) if !exist { rpc.log.Error("undefined collector", "collector", m.Name) @@ -70,14 +70,14 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) error { // RPC, set ok to true as return value. Need by rpc.Call reply argument *ok = true - rpc.log.Info("new metric successfully added", "name", m.Name, "labels", m.Labels, "value", m.Value) + rpc.log.Info("metric successfully added", "name", m.Name, "labels", m.Labels, "value", m.Value) return nil } // Sub subtract the value from the specific metric (gauge only). func (rpc *rpcServer) Sub(m *Metric, ok *bool) error { const op = errors.Op("metrics_plugin_sub") - rpc.log.Info("Subtracting value from metric", "name", m.Name, "value", m.Value, "labels", m.Labels) + rpc.log.Info("subtracting value from metric", "name", m.Name, "value", m.Value, "labels", m.Labels) c, exist := rpc.svc.collectors.Load(m.Name) if !exist { rpc.log.Error("undefined collector", "name", m.Name, "value", m.Value, "labels", m.Labels) @@ -107,7 +107,7 @@ func (rpc *rpcServer) Sub(m *Metric, ok *bool) error { default: return errors.E(op, errors.Errorf("collector `%s` does not support method `Sub`", m.Name)) } - rpc.log.Info("Subtracting operation applied successfully", "name", m.Name, "labels", m.Labels, "value", m.Value) + rpc.log.Info("subtracting operation finished successfully", "name", m.Name, "labels", m.Labels, "value", m.Value) *ok = true return nil @@ -116,7 +116,7 @@ func (rpc *rpcServer) Sub(m *Metric, ok *bool) error { // Observe the value (histogram and summary only). func (rpc *rpcServer) Observe(m *Metric, ok *bool) error { const op = errors.Op("metrics_plugin_observe") - rpc.log.Info("Observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels) + rpc.log.Info("observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels) c, exist := rpc.svc.collectors.Load(m.Name) if !exist { @@ -171,7 +171,7 @@ func (rpc *rpcServer) Observe(m *Metric, ok *bool) error { // error func (rpc *rpcServer) Declare(nc *NamedCollector, ok *bool) error { const op = errors.Op("metrics_plugin_declare") - rpc.log.Info("Declaring new metric", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace) + rpc.log.Info("declaring new metric", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace) _, exist := rpc.svc.collectors.Load(nc.Name) if exist { rpc.log.Error("metric with provided name already exist", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace) @@ -256,7 +256,7 @@ func (rpc *rpcServer) Declare(nc *NamedCollector, ok *bool) error { // Set the metric value (only for gaude). func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) { const op = errors.Op("metrics_plugin_set") - rpc.log.Info("Observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels) + rpc.log.Info("observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels) c, exist := rpc.svc.collectors.Load(m.Name) if !exist { diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go index 611cb363..ee0deda6 100644 --- a/plugins/resetter/plugin.go +++ b/plugins/resetter/plugin.go @@ -1,7 +1,7 @@ package resetter import ( - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index 84ebb3d0..e13768f0 100644 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -5,7 +5,7 @@ import ( "net/rpc" "sync/atomic" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" @@ -86,8 +86,7 @@ func (s *Plugin) Serve() chan error { conn, err := s.listener.Accept() if err != nil { if atomic.LoadUint32(s.closed) == 1 { - // just log and continue, this is not a critical issue, we just called Stop - s.log.Warn("listener accept error, connection closed", "error", err) + // just continue, this is not a critical issue, we just called Stop return } diff --git a/plugins/server/config.go b/plugins/server/config.go index 92e6780a..a4b0d91c 100644 --- a/plugins/server/config.go +++ b/plugins/server/config.go @@ -26,112 +26,25 @@ type Config struct { RelayTimeout time.Duration `mapstructure:"relay_timeout"` } `mapstructure:"server"` + // we just need to know if the section exist, we don't need to read config from it RPC *struct { Listen string `mapstructure:"listen"` } `mapstructure:"rpc"` Logs *struct { - Mode string `mapstructure:"mode"` - Level string `mapstructure:"level"` } `mapstructure:"logs"` HTTP *struct { - Address string `mapstructure:"address"` - MaxRequestSize int `mapstructure:"max_request_size"` - Middleware []string `mapstructure:"middleware"` - Uploads struct { - Forbid []string `mapstructure:"forbid"` - } `mapstructure:"uploads"` - TrustedSubnets []string `mapstructure:"trusted_subnets"` - Pool struct { - NumWorkers int `mapstructure:"num_workers"` - MaxJobs int `mapstructure:"max_jobs"` - AllocateTimeout string `mapstructure:"allocate_timeout"` - DestroyTimeout string `mapstructure:"destroy_timeout"` - Supervisor struct { - WatchTick int `mapstructure:"watch_tick"` - TTL int `mapstructure:"ttl"` - IdleTTL int `mapstructure:"idle_ttl"` - ExecTTL int `mapstructure:"exec_ttl"` - MaxWorkerMemory int `mapstructure:"max_worker_memory"` - } `mapstructure:"supervisor"` - } `mapstructure:"pool"` - Ssl struct { - Port int `mapstructure:"port"` - Redirect bool `mapstructure:"redirect"` - Cert string `mapstructure:"cert"` - Key string `mapstructure:"key"` - } `mapstructure:"ssl"` - Fcgi struct { - Address string `mapstructure:"address"` - } `mapstructure:"fcgi"` - HTTP2 struct { - Enabled bool `mapstructure:"enabled"` - H2C bool `mapstructure:"h2c"` - MaxConcurrentStreams int `mapstructure:"max_concurrent_streams"` - } `mapstructure:"http2"` } `mapstructure:"http"` Redis *struct { - Addrs []string `mapstructure:"addrs"` - MasterName string `mapstructure:"master_name"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - DB int `mapstructure:"db"` - SentinelPassword string `mapstructure:"sentinel_password"` - RouteByLatency bool `mapstructure:"route_by_latency"` - RouteRandomly bool `mapstructure:"route_randomly"` - DialTimeout int `mapstructure:"dial_timeout"` - MaxRetries int `mapstructure:"max_retries"` - MinRetryBackoff int `mapstructure:"min_retry_backoff"` - MaxRetryBackoff int `mapstructure:"max_retry_backoff"` - PoolSize int `mapstructure:"pool_size"` - MinIdleConns int `mapstructure:"min_idle_conns"` - MaxConnAge int `mapstructure:"max_conn_age"` - ReadTimeout int `mapstructure:"read_timeout"` - WriteTimeout int `mapstructure:"write_timeout"` - PoolTimeout int `mapstructure:"pool_timeout"` - IdleTimeout int `mapstructure:"idle_timeout"` - IdleCheckFreq int `mapstructure:"idle_check_freq"` - ReadOnly bool `mapstructure:"read_only"` } `mapstructure:"redis"` Boltdb *struct { - Dir string `mapstructure:"dir"` - File string `mapstructure:"file"` - Bucket string `mapstructure:"bucket"` - Permissions int `mapstructure:"permissions"` - TTL int `mapstructure:"TTL"` } `mapstructure:"boltdb"` Memcached *struct { - Addr []string `mapstructure:"addr"` } `mapstructure:"memcached"` Memory *struct { - Enabled bool `mapstructure:"enabled"` - Interval int `mapstructure:"interval"` } `mapstructure:"memory"` Metrics *struct { - Address string `mapstructure:"address"` - Collect struct { - AppMetric struct { - Type string `mapstructure:"type"` - Help string `mapstructure:"help"` - Labels []string `mapstructure:"labels"` - Buckets []float64 `mapstructure:"buckets"` - Objectives []struct { - Num2 float64 `mapstructure:"2,omitempty"` - One4 float64 `mapstructure:"1.4,omitempty"` - } `mapstructure:"objectives"` - } `mapstructure:"app_metric"` - } `mapstructure:"collect"` } `mapstructure:"metrics"` Reload *struct { - Interval string `mapstructure:"interval"` - Patterns []string `mapstructure:"patterns"` - Services struct { - HTTP struct { - Recursive bool `mapstructure:"recursive"` - Ignore []string `mapstructure:"ignore"` - Patterns []string `mapstructure:"patterns"` - Dirs []string `mapstructure:"dirs"` - } `mapstructure:"http"` - } `mapstructure:"services"` } `mapstructure:"reload"` } diff --git a/plugins/server/interface.go b/plugins/server/interface.go index a2d8b92b..22f02685 100644 --- a/plugins/server/interface.go +++ b/plugins/server/interface.go @@ -4,10 +4,9 @@ import ( "context" "os/exec" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/worker" ) // Env variables type alias @@ -16,6 +15,6 @@ type Env map[string]string // Server creates workers for the application. type Server interface { CmdFactory(env Env) (func() *exec.Cmd, error) - NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) - NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error) + NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) + NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 0c9c49ea..9cdb8401 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -8,16 +8,16 @@ import ( "strings" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/transport" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" // core imports - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/pkg/pipe" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/socket" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + "github.com/spiral/roadrunner/v2/pkg/transport/socket" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/utils" ) @@ -33,7 +33,7 @@ const RR_RPC = "RR_RPC" //nolint:golint,stylecheck type Plugin struct { cfg Config log logger.Logger - factory worker.Factory + factory transport.Factory } // Init application provider. @@ -115,7 +115,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { } // NewWorker issues new standalone worker. -func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) { +func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) { const op = errors.Op("server_plugin_new_worker") list := make([]events.Listener, 0, len(listeners)) @@ -135,7 +135,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event } // NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { +func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { const op = errors.Op("server_plugin_new_worker_pool") spawnCmd, err := server.CmdFactory(env) if err != nil { @@ -148,7 +148,7 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, en list = append(list, listeners...) } - p, err := poolImpl.Initialize(ctx, spawnCmd, server.factory, opt, poolImpl.AddListeners(list...)) + p, err := pool.Initialize(ctx, spawnCmd, server.factory, opt, pool.AddListeners(list...)) if err != nil { return nil, errors.E(op, err) } @@ -157,7 +157,7 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, en } // creates relay and worker factory. -func (server *Plugin) initFactory() (worker.Factory, error) { +func (server *Plugin) initFactory() (transport.Factory, error) { const op = errors.Op("server_plugin_init_factory") if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" { return pipe.NewPipeFactory(), nil diff --git a/plugins/static/plugin.go b/plugins/static/plugin.go index fa2bcfbe..1687cf11 100644 --- a/plugins/static/plugin.go +++ b/plugins/static/plugin.go @@ -32,6 +32,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if !cfg.Has(RootPluginName) { return errors.E(op, errors.Disabled) } + err := cfg.UnmarshalKey(RootPluginName, &s.cfg) if err != nil { return errors.E(op, errors.Disabled, err) diff --git a/tests/composer.json b/tests/composer.json index d4f32be5..889b6808 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -1,5 +1,6 @@ { "minimum-stability": "beta", + "prefer-stable": true, "require": { "nyholm/psr7": "^1.3", "spiral/roadrunner": "^2.0", diff --git a/tests/mocks/mock_log.go b/tests/mocks/mock_log.go index e9631805..66c70b91 100644 --- a/tests/mocks/mock_log.go +++ b/tests/mocks/mock_log.go @@ -2,6 +2,7 @@ package mocks import ( "reflect" + "sync" "github.com/golang/mock/gomock" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -9,6 +10,7 @@ import ( // MockLogger is a mock of Logger interface. type MockLogger struct { + sync.Mutex ctrl *gomock.Controller recorder *MockLoggerMockRecorder } @@ -38,6 +40,8 @@ func (m *MockLogger) Init() error { // Debug mocks base method. func (m *MockLogger) Debug(msg string, keyvals ...interface{}) { + m.Lock() + defer m.Unlock() m.ctrl.T.Helper() varargs := []interface{}{msg} for _, a := range keyvals { @@ -48,6 +52,8 @@ func (m *MockLogger) Debug(msg string, keyvals ...interface{}) { // Warn mocks base method. func (m *MockLogger) Warn(msg string, keyvals ...interface{}) { + m.Lock() + defer m.Unlock() m.ctrl.T.Helper() varargs := []interface{}{msg} for _, a := range keyvals { @@ -58,6 +64,8 @@ func (m *MockLogger) Warn(msg string, keyvals ...interface{}) { // Info mocks base method. func (m *MockLogger) Info(msg string, keyvals ...interface{}) { + m.Lock() + defer m.Unlock() m.ctrl.T.Helper() varargs := []interface{}{msg} for _, a := range keyvals { @@ -68,6 +76,8 @@ func (m *MockLogger) Info(msg string, keyvals ...interface{}) { // Error mocks base method. func (m *MockLogger) Error(msg string, keyvals ...interface{}) { + m.Lock() + defer m.Unlock() m.ctrl.T.Helper() varargs := []interface{}{msg} for _, a := range keyvals { diff --git a/tests/plugins/checker/configs/.rr-checker-init.yaml b/tests/plugins/checker/configs/.rr-checker-init.yaml index 36130382..11804a21 100755 --- a/tests/plugins/checker/configs/.rr-checker-init.yaml +++ b/tests/plugins/checker/configs/.rr-checker-init.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6005 - disabled: false server: command: "php ../../http/client.php echo pipes" @@ -15,7 +14,7 @@ status: address: "127.0.0.1:34333" logs: mode: development - level: debug + level: error http: debug: true address: 127.0.0.1:11933 diff --git a/tests/plugins/checker/plugin_test.go b/tests/plugins/checker/plugin_test.go index c346d91a..5e391158 100644 --- a/tests/plugins/checker/plugin_test.go +++ b/tests/plugins/checker/plugin_test.go @@ -12,7 +12,7 @@ import ( "testing" "time" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/checker" "github.com/spiral/roadrunner/v2/plugins/config" diff --git a/tests/plugins/config/.rr.yaml b/tests/plugins/config/.rr.yaml index bad2846a..a6e80921 100755 --- a/tests/plugins/config/.rr.yaml +++ b/tests/plugins/config/.rr.yaml @@ -2,7 +2,6 @@ rpc: listen: tcp://localhost:6060 reload: - enabled: true interval: 1s patterns: [".php"] services: diff --git a/tests/plugins/config/config_test.go b/tests/plugins/config/config_test.go index 6d95ba70..364960db 100755 --- a/tests/plugins/config/config_test.go +++ b/tests/plugins/config/config_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/stretchr/testify/assert" ) diff --git a/tests/plugins/gzip/plugin_test.go b/tests/plugins/gzip/plugin_test.go index d525dcc6..3e3db0f8 100644 --- a/tests/plugins/gzip/plugin_test.go +++ b/tests/plugins/gzip/plugin_test.go @@ -7,9 +7,10 @@ import ( "sync" "syscall" "testing" + "time" "github.com/golang/mock/gomock" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/gzip" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -172,6 +173,7 @@ func TestMiddlewareNotExist(t *testing.T) { } }() + time.Sleep(time.Second) stopCh <- struct{}{} wg.Wait() } diff --git a/tests/plugins/headers/headers_plugin_test.go b/tests/plugins/headers/headers_plugin_test.go index a2ad3357..49d86b00 100644 --- a/tests/plugins/headers/headers_plugin_test.go +++ b/tests/plugins/headers/headers_plugin_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/headers" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -80,6 +80,7 @@ func TestHeadersInit(t *testing.T) { } }() + time.Sleep(time.Second) stopCh <- struct{}{} wg.Wait() } diff --git a/tests/plugins/http/configs/.rr-broken-pipes.yaml b/tests/plugins/http/configs/.rr-broken-pipes.yaml index bfcbf592..9b7d2d0b 100644 --- a/tests/plugins/http/configs/.rr-broken-pipes.yaml +++ b/tests/plugins/http/configs/.rr-broken-pipes.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false server: command: "php ../../http/client.php broken pipes" diff --git a/tests/plugins/http/configs/.rr-env.yaml b/tests/plugins/http/configs/.rr-env.yaml index 219bad19..e6b00b69 100644 --- a/tests/plugins/http/configs/.rr-env.yaml +++ b/tests/plugins/http/configs/.rr-env.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false server: command: "php ../../http/client.php env pipes" diff --git a/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml b/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml index 05c3d40a..ab42f4fc 100644 --- a/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml +++ b/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml @@ -26,11 +26,10 @@ http: redirect: false cert: fixtures/server.crt key: fixtures/server.key - # rootCa: root.crt + # root_ca: root.crt fcgi: address: tcp://127.0.0.1:6921 http2: - enabled: false h2c: false maxConcurrentStreams: 128 logs: diff --git a/tests/plugins/http/configs/.rr-fcgi.yaml b/tests/plugins/http/configs/.rr-fcgi.yaml index cfd4b79b..bd5d01bd 100644 --- a/tests/plugins/http/configs/.rr-fcgi.yaml +++ b/tests/plugins/http/configs/.rr-fcgi.yaml @@ -26,11 +26,10 @@ http: redirect: false cert: fixtures/server.crt key: fixtures/server.key - # rootCa: root.crt + # root_ca: root.crt fcgi: address: tcp://0.0.0.0:6920 http2: - enabled: false h2c: false maxConcurrentStreams: 128 logs: diff --git a/tests/plugins/http/configs/.rr-h2c.yaml b/tests/plugins/http/configs/.rr-h2c.yaml index c2703182..2061a76b 100644 --- a/tests/plugins/http/configs/.rr-h2c.yaml +++ b/tests/plugins/http/configs/.rr-h2c.yaml @@ -21,7 +21,6 @@ http: allocate_timeout: 60s destroy_timeout: 60s http2: - enabled: true h2c: true maxConcurrentStreams: 128 logs: diff --git a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml new file mode 100644 index 00000000..3e392577 --- /dev/null +++ b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml @@ -0,0 +1,33 @@ +rpc: + listen: tcp://127.0.0.1:15432 +server: + command: "php ../../http/client.php echo pipes" + user: "" + group: "" + env: + "RR_HTTP": "true" + relay: "pipes" + relay_timeout: "20s" + +http: + debug: true + address: 127.0.0.1:18888 + max_request_size: 1024 + middleware: [ "" ] + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 1 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + supervisor: + watch_tick: 1s + ttl: 0 + idle_ttl: 5s + exec_ttl: 10s + max_worker_memory: 100 +logs: + mode: development + level: error
\ No newline at end of file diff --git a/tests/plugins/http/configs/.rr-http.yaml b/tests/plugins/http/configs/.rr-http.yaml index 30d31819..184a353c 100644 --- a/tests/plugins/http/configs/.rr-http.yaml +++ b/tests/plugins/http/configs/.rr-http.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false server: command: "php ../../http/client.php echo pipes" diff --git a/tests/plugins/http/configs/.rr-init.yaml b/tests/plugins/http/configs/.rr-init.yaml index 01b90b44..77132b43 100644 --- a/tests/plugins/http/configs/.rr-init.yaml +++ b/tests/plugins/http/configs/.rr-init.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false server: command: "php ../../http/client.php echo pipes" @@ -30,11 +29,10 @@ http: redirect: false cert: fixtures/server.crt key: fixtures/server.key - # rootCa: root.crt + # root_ca: root.crt fcgi: address: tcp://0.0.0.0:7921 http2: - enabled: false h2c: false maxConcurrentStreams: 128 logs: diff --git a/tests/plugins/http/configs/.rr-no-http.yaml b/tests/plugins/http/configs/.rr-no-http.yaml index 6466c950..a6747b5d 100644 --- a/tests/plugins/http/configs/.rr-no-http.yaml +++ b/tests/plugins/http/configs/.rr-no-http.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false server: command: "php ../../http/client.php echo pipes" diff --git a/tests/plugins/http/configs/.rr-resetter.yaml b/tests/plugins/http/configs/.rr-resetter.yaml index 88c54858..a1ef27d1 100644 --- a/tests/plugins/http/configs/.rr-resetter.yaml +++ b/tests/plugins/http/configs/.rr-resetter.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false server: command: "php ../../http/client.php echo pipes" diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index ee6f795d..e47dbd44 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -10,8 +10,8 @@ import ( "runtime" "strings" - "github.com/spiral/roadrunner/v2/pkg/pipe" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/stretchr/testify/assert" @@ -23,10 +23,10 @@ import ( ) func TestHandler_Echo(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -38,7 +38,7 @@ func TestHandler_Echo(t *testing.T) { h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -74,10 +74,10 @@ func Test_HandlerErrors(t *testing.T) { } func TestHandler_Headers(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "header", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -86,13 +86,13 @@ func TestHandler_Headers(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8078", Handler: h} @@ -135,10 +135,10 @@ func TestHandler_Headers(t *testing.T) { } func TestHandler_Empty_User_Agent(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -147,13 +147,13 @@ func TestHandler_Empty_User_Agent(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8088", Handler: h} @@ -195,10 +195,10 @@ func TestHandler_Empty_User_Agent(t *testing.T) { } func TestHandler_User_Agent(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -207,13 +207,13 @@ func TestHandler_User_Agent(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8088", Handler: h} @@ -255,10 +255,10 @@ func TestHandler_User_Agent(t *testing.T) { } func TestHandler_Cookies(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "cookie", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -267,13 +267,13 @@ func TestHandler_Cookies(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8079", Handler: h} @@ -320,10 +320,10 @@ func TestHandler_Cookies(t *testing.T) { } func TestHandler_JsonPayload_POST(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -332,13 +332,13 @@ func TestHandler_JsonPayload_POST(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8090", Handler: h} @@ -384,10 +384,10 @@ func TestHandler_JsonPayload_POST(t *testing.T) { } func TestHandler_JsonPayload_PUT(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -396,13 +396,13 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8081", Handler: h} @@ -444,10 +444,10 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { } func TestHandler_JsonPayload_PATCH(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -456,13 +456,13 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8082", Handler: h} @@ -504,10 +504,10 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { } func TestHandler_FormData_POST(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -516,13 +516,13 @@ func TestHandler_FormData_POST(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8083", Handler: h} @@ -577,10 +577,10 @@ func TestHandler_FormData_POST(t *testing.T) { } func TestHandler_FormData_POST_Overwrite(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -589,13 +589,13 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8083", Handler: h} @@ -650,10 +650,10 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { } func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -662,13 +662,13 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8083", Handler: h} @@ -722,10 +722,10 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { } func TestHandler_FormData_PUT(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -734,13 +734,13 @@ func TestHandler_FormData_PUT(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":17834", Handler: h} @@ -794,10 +794,10 @@ func TestHandler_FormData_PUT(t *testing.T) { } func TestHandler_FormData_PATCH(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -806,13 +806,13 @@ func TestHandler_FormData_PATCH(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8085", Handler: h} @@ -866,10 +866,10 @@ func TestHandler_FormData_PATCH(t *testing.T) { } func TestHandler_Multipart_POST(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -878,13 +878,13 @@ func TestHandler_Multipart_POST(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8019", Handler: h} @@ -980,10 +980,10 @@ func TestHandler_Multipart_POST(t *testing.T) { } func TestHandler_Multipart_PUT(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -992,13 +992,13 @@ func TestHandler_Multipart_PUT(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8020", Handler: h} @@ -1094,10 +1094,10 @@ func TestHandler_Multipart_PUT(t *testing.T) { } func TestHandler_Multipart_PATCH(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1106,13 +1106,13 @@ func TestHandler_Multipart_PATCH(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8021", Handler: h} @@ -1210,10 +1210,10 @@ func TestHandler_Multipart_PATCH(t *testing.T) { } func TestHandler_Error(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1222,13 +1222,13 @@ func TestHandler_Error(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1256,10 +1256,10 @@ func TestHandler_Error(t *testing.T) { } func TestHandler_Error2(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error2", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1268,13 +1268,13 @@ func TestHandler_Error2(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1302,10 +1302,10 @@ func TestHandler_Error2(t *testing.T) { } func TestHandler_Error3(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1314,13 +1314,13 @@ func TestHandler_Error3(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1361,10 +1361,10 @@ func TestHandler_Error3(t *testing.T) { } func TestHandler_ResponseDuration(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1373,13 +1373,13 @@ func TestHandler_ResponseDuration(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1422,10 +1422,10 @@ func TestHandler_ResponseDuration(t *testing.T) { } func TestHandler_ResponseDurationDelayed(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echoDelay", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1434,13 +1434,13 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1482,10 +1482,10 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { } func TestHandler_ErrorDuration(t *testing.T) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1494,13 +1494,13 @@ func TestHandler_ErrorDuration(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(t, err) hs := &http.Server{Addr: ":8177", Handler: h} @@ -1556,10 +1556,10 @@ func TestHandler_IP(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1568,13 +1568,13 @@ func TestHandler_IP(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, cidrs, pool) + }, cidrs, p) assert.NoError(t, err) hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} @@ -1617,10 +1617,10 @@ func TestHandler_XRealIP(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1629,13 +1629,13 @@ func TestHandler_XRealIP(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, cidrs, pool) + }, cidrs, p) assert.NoError(t, err) hs := &http.Server{Addr: "127.0.0.1:8179", Handler: h} @@ -1683,10 +1683,10 @@ func TestHandler_XForwardedFor(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1695,13 +1695,13 @@ func TestHandler_XForwardedFor(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, cidrs, pool) + }, cidrs, p) assert.NoError(t, err) hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} @@ -1748,10 +1748,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1760,13 +1760,13 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { t.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, cidrs, pool) + }, cidrs, p) assert.NoError(t, err) hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} @@ -1796,11 +1796,11 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { } func BenchmarkHandler_Listen_Echo(b *testing.B) { - pool, err := poolImpl.Initialize(context.Background(), + p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ - NumWorkers: int64(runtime.NumCPU()), + pool.Config{ + NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, }) @@ -1808,13 +1808,13 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { b.Fatal(err) } defer func() { - pool.Destroy(context.Background()) + p.Destroy(context.Background()) }() h, err := httpPlugin.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, - }, nil, pool) + }, nil, p) assert.NoError(b, err) hs := &http.Server{Addr: ":8177", Handler: h} diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index 72ae05a0..4f99dbbb 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -17,9 +17,9 @@ import ( "time" "github.com/golang/mock/gomock" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -1221,6 +1221,125 @@ func TestHttpBrokenPipes(t *testing.T) { assert.Error(t, err) } +func TestHTTPSupervisedPool(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-http-supervised-pool.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &httpPlugin.Plugin{}, + &informer.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("HTTPEchoTest", echoHTTP2) + // worker should be destructed (idle_ttl) + t.Run("HTTPInformerCompareWorkersTest", informerTest2) + + stopCh <- struct{}{} + wg.Wait() +} + +func echoHTTP2(t *testing.T) { + req, err := http.NewRequest("GET", "http://localhost:18888?hello=world", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + + err = r.Body.Close() + assert.NoError(t, err) +} + +// get worker +// sleep +// supervisor destroy worker +// compare pid's +func informerTest2(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:15432") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + pid := 0 + // WorkerList contains list of workers. + list := struct { + // Workers is list of workers. + Workers []tools.ProcessState `json:"workers"` + }{} + + err = client.Call("informer.Workers", "http", &list) + assert.NoError(t, err) + assert.Len(t, list.Workers, 1) + // save the pid + pid = list.Workers[0].Pid + time.Sleep(time.Second * 10) + + list = struct { + // Workers is list of workers. + Workers []tools.ProcessState `json:"workers"` + }{} + + err = client.Call("informer.Workers", "http", &list) + assert.NoError(t, err) + assert.Len(t, list.Workers, 1) + assert.NotEqual(t, list.Workers[0].Pid, pid) +} + func get(url string) (string, *http.Response, error) { r, err := http.Get(url) //nolint:gosec if err != nil { diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index e03638d2..dd986902 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -16,8 +16,8 @@ import ( "time" j "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/pipe" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/stretchr/testify/assert" diff --git a/tests/plugins/informer/.rr-informer.yaml b/tests/plugins/informer/.rr-informer.yaml index e5853b21..e1edbb44 100644 --- a/tests/plugins/informer/.rr-informer.yaml +++ b/tests/plugins/informer/.rr-informer.yaml @@ -10,7 +10,6 @@ server: rpc: listen: tcp://127.0.0.1:6001 - disabled: false logs: mode: development level: error
\ No newline at end of file diff --git a/tests/plugins/informer/informer_test.go b/tests/plugins/informer/informer_test.go index d9fc2143..31e14ff4 100644 --- a/tests/plugins/informer/informer_test.go +++ b/tests/plugins/informer/informer_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" @@ -86,13 +86,14 @@ func TestInformerInit(t *testing.T) { }() time.Sleep(time.Second) - t.Run("InformerRpcTest", informerRPCTest) + t.Run("InformerWorkersRpcTest", informerWorkersRPCTest) + t.Run("InformerListRpcTest", informerListRPCTest) stopCh <- struct{}{} wg.Wait() } -func informerRPCTest(t *testing.T) { +func informerWorkersRPCTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) @@ -106,3 +107,15 @@ func informerRPCTest(t *testing.T) { assert.NoError(t, err) assert.Len(t, list.Workers, 10) } + +func informerListRPCTest(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + // WorkerList contains list of workers. + list := make([]string, 0, 0) + + err = client.Call("informer.List", true, &list) + assert.NoError(t, err) + assert.Equal(t, "informer.plugin1", list[0]) +} diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go index ba281d02..2300de89 100644 --- a/tests/plugins/informer/test_plugin.go +++ b/tests/plugins/informer/test_plugin.go @@ -4,22 +4,22 @@ import ( "context" "time" - "github.com/spiral/roadrunner/v2/interfaces/worker" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/server" ) -var testPoolConfig = poolImpl.Config{ +var testPoolConfig = pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: &poolImpl.SupervisorConfig{ - WatchTick: 60, - TTL: 1000, - IdleTTL: 10, - ExecTTL: 10, + Supervisor: &pool.SupervisorConfig{ + WatchTick: 60 * time.Second, + TTL: 1000 * time.Second, + IdleTTL: 10 * time.Second, + ExecTTL: 10 * time.Second, MaxWorkerMemory: 1000, }, } @@ -50,10 +50,16 @@ func (p1 *Plugin1) Name() string { } func (p1 *Plugin1) Workers() []worker.BaseProcess { - pool, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) + p, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) if err != nil { panic(err) } - return pool.Workers() + workers := p.Workers() + baseWorkers := make([]worker.BaseProcess, 0, len(workers)) + for i := 0; i < len(workers); i++ { + baseWorkers = append(baseWorkers, worker.FromSync(workers[i].(*worker.SyncWorkerImpl))) + } + + return baseWorkers } diff --git a/tests/plugins/kv/boltdb/configs/.rr-init.yaml b/tests/plugins/kv/boltdb/configs/.rr-init.yaml index 8cfa20e9..e4644511 100644 --- a/tests/plugins/kv/boltdb/configs/.rr-init.yaml +++ b/tests/plugins/kv/boltdb/configs/.rr-init.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false server: command: "php ../../../psr-worker-bench.php" @@ -13,7 +12,7 @@ server: logs: mode: development - level: debug + level: error http: address: 127.0.0.1:44933 diff --git a/tests/plugins/kv/boltdb/plugin_test.go b/tests/plugins/kv/boltdb/plugin_test.go index 5548402d..3a4542ff 100644 --- a/tests/plugins/kv/boltdb/plugin_test.go +++ b/tests/plugins/kv/boltdb/plugin_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -23,7 +23,7 @@ import ( ) func TestBoltDb(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ diff --git a/tests/plugins/kv/memcached/configs/.rr-init.yaml b/tests/plugins/kv/memcached/configs/.rr-init.yaml index 66ed75cf..fbca3250 100644 --- a/tests/plugins/kv/memcached/configs/.rr-init.yaml +++ b/tests/plugins/kv/memcached/configs/.rr-init.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false server: command: "php ../../../psr-worker-bench.php" @@ -13,7 +12,7 @@ server: logs: mode: development - level: debug + level: error http: address: 127.0.0.1:44933 diff --git a/tests/plugins/kv/memcached/plugin_test.go b/tests/plugins/kv/memcached/plugin_test.go index d4cb58bb..3878ef67 100644 --- a/tests/plugins/kv/memcached/plugin_test.go +++ b/tests/plugins/kv/memcached/plugin_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -23,7 +23,7 @@ import ( ) func TestMemcache(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ diff --git a/tests/plugins/kv/memory/configs/.rr-init.yaml b/tests/plugins/kv/memory/configs/.rr-init.yaml index e352fad2..8780a622 100644 --- a/tests/plugins/kv/memory/configs/.rr-init.yaml +++ b/tests/plugins/kv/memory/configs/.rr-init.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false server: command: "php ../../../psr-worker-bench.php" @@ -13,7 +12,7 @@ server: logs: mode: development - level: debug + level: error http: address: 127.0.0.1:44933 @@ -39,7 +38,5 @@ http: # in memory KV driver memory: - enabled: - true # keys ttl check interval interval: 1 diff --git a/tests/plugins/kv/memory/plugin_test.go b/tests/plugins/kv/memory/plugin_test.go index ee01fabb..528403d0 100644 --- a/tests/plugins/kv/memory/plugin_test.go +++ b/tests/plugins/kv/memory/plugin_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -23,7 +23,7 @@ import ( ) func TestInMemory(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ diff --git a/tests/plugins/logger/.rr.yaml b/tests/plugins/logger/.rr.yaml index cb555ec3..5ab359d3 100644 --- a/tests/plugins/logger/.rr.yaml +++ b/tests/plugins/logger/.rr.yaml @@ -1,3 +1,3 @@ logs: mode: development - level: debug
\ No newline at end of file + level: error
\ No newline at end of file diff --git a/tests/plugins/logger/logger_test.go b/tests/plugins/logger/logger_test.go index cc788be3..63f233ee 100644 --- a/tests/plugins/logger/logger_test.go +++ b/tests/plugins/logger/logger_test.go @@ -6,7 +6,7 @@ import ( "sync" "testing" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/stretchr/testify/assert" diff --git a/tests/plugins/metrics/.rr-test.yaml b/tests/plugins/metrics/.rr-test.yaml index 37c50395..bc68b90f 100644 --- a/tests/plugins/metrics/.rr-test.yaml +++ b/tests/plugins/metrics/.rr-test.yaml @@ -1,6 +1,5 @@ rpc: listen: tcp://127.0.0.1:6001 - disabled: false metrics: # prometheus client address (path /metrics added automatically) diff --git a/tests/plugins/metrics/metrics_test.go b/tests/plugins/metrics/metrics_test.go index c94d51bc..b5a4fd4f 100644 --- a/tests/plugins/metrics/metrics_test.go +++ b/tests/plugins/metrics/metrics_test.go @@ -11,12 +11,14 @@ import ( "testing" "time" - "github.com/spiral/endure" + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/metrics" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" ) @@ -75,6 +77,7 @@ func TestMetricsInit(t *testing.T) { signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) tt := time.NewTimer(time.Second * 5) + defer tt.Stop() out, err := get() assert.NoError(t, err) @@ -138,6 +141,7 @@ func TestMetricsGaugeCollector(t *testing.T) { time.Sleep(time.Second) tt := time.NewTimer(time.Second * 5) + defer tt.Stop() out, err := get() assert.NoError(t, err) @@ -183,11 +187,77 @@ func TestMetricsDifferentRPCCalls(t *testing.T) { cfg.Prefix = "rr" cfg.Path = ".rr-test.yaml" + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", []string{"metrics"}).MinTimes(1) + + mockLogger.EXPECT().Info("adding metric", "name", "counter_CounterMetric", "value", gomock.Any(), "labels", []string{"type2", "section2"}).MinTimes(1) + mockLogger.EXPECT().Info("adding metric", "name", "histogram_registerHistogram", "value", gomock.Any(), "labels", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("adding metric", "name", "sub_gauge_subVector", "value", gomock.Any(), "labels", []string{"core", "first"}).MinTimes(1) + mockLogger.EXPECT().Info("adding metric", "name", "sub_gauge_subMetric", "value", gomock.Any(), "labels", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("adding metric", "name", "test_metrics_named_collector", "value", gomock.Any(), "labels", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("metric successfully added", "name", "observe_observeMetricNotEnoughLabels", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "observe_observeMetric", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "counter_CounterMetric", "labels", []string{"type2", "section2"}, "value", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "counter_CounterMetric", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "histogram_registerHistogram", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "sub_gauge_subVector", "labels", []string{"core", "first"}, "value", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "sub_gauge_subVector", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "sub_gauge_subMetric", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "sub_gauge_subMetric", "labels", gomock.Any(), "value", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "histogram_setOnHistogram", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "gauge_setWithoutLabels", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "gauge_missing_section_collector", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "gauge_2_collector", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "test_metrics_named_collector", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "test_metrics_named_collector", "labels", gomock.Any(), "value", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("metric successfully added", "name", "user_gauge_collector", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("declaring new metric", "name", "observe_observeMetricNotEnoughLabels", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "observe_observeMetric", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "counter_CounterMetric", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "histogram_registerHistogram", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "sub_gauge_subVector", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "sub_gauge_subMetric", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "histogram_setOnHistogram", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "gauge_setWithoutLabels", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "gauge_missing_section_collector", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "test_metrics_named_collector", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "gauge_2_collector", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("declaring new metric", "name", "user_gauge_collector", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("observing metric", "name", "observe_observeMetric", "value", gomock.Any(), "labels", []string{"test"}).MinTimes(1) + mockLogger.EXPECT().Info("observing metric", "name", "observe_observeMetric", "value", gomock.Any(), "labels", []string{"test", "test2"}).MinTimes(1) + mockLogger.EXPECT().Info("observing metric", "name", "gauge_setOnHistogram", "value", gomock.Any(), "labels", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("observing metric", "name", "gauge_setWithoutLabels", "value", gomock.Any(), "labels", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("observing metric", "name", "gauge_missing_section_collector", "value", gomock.Any(), "labels", []string{"missing"}).MinTimes(1) + mockLogger.EXPECT().Info("observing metric", "name", "user_gauge_collector", "value", gomock.Any(), "labels", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("observing metric", "name", "gauge_2_collector", "value", gomock.Any(), "labels", []string{"core", "first"}).MinTimes(1) + + mockLogger.EXPECT().Info("observe operation finished successfully", "name", "observe_observeMetric", "labels", []string{"test", "test2"}, "value", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("set operation finished successfully", "name", "gauge_2_collector", "labels", []string{"core", "first"}, "value", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("set operation finished successfully", "name", "user_gauge_collector", "labels", gomock.Any(), "value", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("subtracting value from metric", "name", "sub_gauge_subVector", "value", gomock.Any(), "labels", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("subtracting value from metric", "name", "sub_gauge_subMetric", "value", gomock.Any(), "labels", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("subtracting operation finished successfully", "name", "sub_gauge_subVector", "labels", gomock.Any(), "value", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("subtracting operation finished successfully", "name", "sub_gauge_subMetric", "labels", gomock.Any(), "value", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Error("failed to get metrics with label values", "collector", "gauge_missing_section_collector", "labels", []string{"missing"}).MinTimes(1) + mockLogger.EXPECT().Error("required labels for collector", "collector", "gauge_setWithoutLabels").MinTimes(1) + mockLogger.EXPECT().Error("failed to get metrics with label values", "collector", "observe_observeMetric", "labels", []string{"test"}).MinTimes(1) + err = cont.RegisterAll( cfg, &metrics.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, + mockLogger, ) assert.NoError(t, err) @@ -202,8 +272,10 @@ func TestMetricsDifferentRPCCalls(t *testing.T) { sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + tt := time.NewTimer(time.Minute * 3) + defer tt.Stop() + go func() { - tt := time.NewTimer(time.Minute * 3) for { select { case e := <-ch: diff --git a/tests/plugins/mocks/mock_log.go b/tests/plugins/mocks/mock_log.go deleted file mode 100644 index e9631805..00000000 --- a/tests/plugins/mocks/mock_log.go +++ /dev/null @@ -1,150 +0,0 @@ -package mocks - -import ( - "reflect" - - "github.com/golang/mock/gomock" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -// MockLogger is a mock of Logger interface. -type MockLogger struct { - ctrl *gomock.Controller - recorder *MockLoggerMockRecorder -} - -// MockLoggerMockRecorder is the mock recorder for MockLogger. -type MockLoggerMockRecorder struct { - mock *MockLogger -} - -// NewMockLogger creates a new mock instance. -func NewMockLogger(ctrl *gomock.Controller) *MockLogger { - mock := &MockLogger{ctrl: ctrl} - mock.recorder = &MockLoggerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockLogger) EXPECT() *MockLoggerMockRecorder { - return m.recorder -} - -func (m *MockLogger) Init() error { - mock := &MockLogger{ctrl: m.ctrl} - mock.recorder = &MockLoggerMockRecorder{mock} - return nil -} - -// Debug mocks base method. -func (m *MockLogger) Debug(msg string, keyvals ...interface{}) { - m.ctrl.T.Helper() - varargs := []interface{}{msg} - for _, a := range keyvals { - varargs = append(varargs, a) - } - m.ctrl.Call(m, "Debug", varargs...) -} - -// Warn mocks base method. -func (m *MockLogger) Warn(msg string, keyvals ...interface{}) { - m.ctrl.T.Helper() - varargs := []interface{}{msg} - for _, a := range keyvals { - varargs = append(varargs, a) - } - m.ctrl.Call(m, "Warn", varargs...) -} - -// Info mocks base method. -func (m *MockLogger) Info(msg string, keyvals ...interface{}) { - m.ctrl.T.Helper() - varargs := []interface{}{msg} - for _, a := range keyvals { - varargs = append(varargs, a) - } - m.ctrl.Call(m, "Info", varargs...) -} - -// Error mocks base method. -func (m *MockLogger) Error(msg string, keyvals ...interface{}) { - m.ctrl.T.Helper() - varargs := []interface{}{msg} - for _, a := range keyvals { - varargs = append(varargs, a) - } - m.ctrl.Call(m, "Error", varargs...) -} - -// Warn indicates an expected call of Warn. -func (mr *MockLoggerMockRecorder) Warn(msg interface{}, keyvals ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{msg}, keyvals...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Warn", reflect.TypeOf((*MockLogger)(nil).Warn), varargs...) -} - -// Debug indicates an expected call of Debug. -func (mr *MockLoggerMockRecorder) Debug(msg interface{}, keyvals ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{msg}, keyvals...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Debug", reflect.TypeOf((*MockLogger)(nil).Debug), varargs...) -} - -// Error indicates an expected call of Error. -func (mr *MockLoggerMockRecorder) Error(msg interface{}, keyvals ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{msg}, keyvals...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockLogger)(nil).Error), varargs...) -} - -func (mr *MockLoggerMockRecorder) Init() error { - return nil -} - -// Info indicates an expected call of Info. -func (mr *MockLoggerMockRecorder) Info(msg interface{}, keyvals ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{msg}, keyvals...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockLogger)(nil).Info), varargs...) -} - -// MockWithLogger is a mock of WithLogger interface. -type MockWithLogger struct { - ctrl *gomock.Controller - recorder *MockWithLoggerMockRecorder -} - -// MockWithLoggerMockRecorder is the mock recorder for MockWithLogger. -type MockWithLoggerMockRecorder struct { - mock *MockWithLogger -} - -// NewMockWithLogger creates a new mock instance. -func NewMockWithLogger(ctrl *gomock.Controller) *MockWithLogger { - mock := &MockWithLogger{ctrl: ctrl} - mock.recorder = &MockWithLoggerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockWithLogger) EXPECT() *MockWithLoggerMockRecorder { - return m.recorder -} - -// With mocks base method. -func (m *MockWithLogger) With(keyvals ...interface{}) logger.Logger { - m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range keyvals { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "With", varargs...) - ret0, _ := ret[0].(logger.Logger) - return ret0 -} - -// With indicates an expected call of With. -func (mr *MockWithLoggerMockRecorder) With(keyvals ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "With", reflect.TypeOf((*MockWithLogger)(nil).With), keyvals...) -} diff --git a/tests/plugins/redis/redis_plugin_test.go b/tests/plugins/redis/redis_plugin_test.go index eba05752..96a191a1 100644 --- a/tests/plugins/redis/redis_plugin_test.go +++ b/tests/plugins/redis/redis_plugin_test.go @@ -10,7 +10,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/golang/mock/gomock" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/redis" "github.com/spiral/roadrunner/v2/tests/mocks" diff --git a/tests/plugins/reload/reload_plugin_test.go b/tests/plugins/reload/reload_plugin_test.go index 81ac3c44..9007541b 100644 --- a/tests/plugins/reload/reload_plugin_test.go +++ b/tests/plugins/reload/reload_plugin_test.go @@ -14,7 +14,7 @@ import ( "time" "github.com/golang/mock/gomock" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" diff --git a/tests/plugins/resetter/.rr-resetter.yaml b/tests/plugins/resetter/.rr-resetter.yaml index e5853b21..623ba142 100644 --- a/tests/plugins/resetter/.rr-resetter.yaml +++ b/tests/plugins/resetter/.rr-resetter.yaml @@ -10,7 +10,6 @@ server: rpc: listen: tcp://127.0.0.1:6001 - disabled: false logs: mode: development - level: error
\ No newline at end of file + level: debug
\ No newline at end of file diff --git a/tests/plugins/resetter/resetter_test.go b/tests/plugins/resetter/resetter_test.go index 89dd43c7..465d22dd 100644 --- a/tests/plugins/resetter/resetter_test.go +++ b/tests/plugins/resetter/resetter_test.go @@ -10,13 +10,14 @@ import ( "testing" "time" - "github.com/spiral/endure" + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" ) @@ -31,10 +32,26 @@ func TestResetterInit(t *testing.T) { Prefix: "rr", } + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", []string{"resetter"}).MinTimes(1) + + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Debug("started List method").MinTimes(1) + mockLogger.EXPECT().Debug("services list", "services", []string{"resetter.plugin1"}).MinTimes(1) + mockLogger.EXPECT().Debug("finished List method").MinTimes(1) + mockLogger.EXPECT().Debug("started Reset method for the service", "service", "resetter.plugin1").MinTimes(1) + mockLogger.EXPECT().Debug("finished Reset method for the service", "service", "resetter.plugin1").MinTimes(1) + mockLogger.EXPECT().Warn("listener accept error, connection closed", "error", gomock.Any()).AnyTimes() + err = cont.RegisterAll( cfg, &server.Plugin{}, - &logger.ZapLogger{}, + mockLogger, &resetter.Plugin{}, &rpcPlugin.Plugin{}, &Plugin1{}, diff --git a/tests/plugins/resetter/test_plugin.go b/tests/plugins/resetter/test_plugin.go index 7d53bca0..61942516 100644 --- a/tests/plugins/resetter/test_plugin.go +++ b/tests/plugins/resetter/test_plugin.go @@ -15,10 +15,10 @@ var testPoolConfig = poolImpl.Config{ AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, Supervisor: &poolImpl.SupervisorConfig{ - WatchTick: 60, - TTL: 1000, - IdleTTL: 10, - ExecTTL: 10, + WatchTick: 60 * time.Second, + TTL: 1000 * time.Second, + IdleTTL: 10 * time.Second, + ExecTTL: 10 * time.Second, MaxWorkerMemory: 1000, }, } diff --git a/tests/plugins/rpc/rpc_test.go b/tests/plugins/rpc/rpc_test.go index 98959b28..49d3b3f1 100644 --- a/tests/plugins/rpc/rpc_test.go +++ b/tests/plugins/rpc/rpc_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go index 5eb2fed1..af34b4d3 100644 --- a/tests/plugins/server/plugin_pipes.go +++ b/tests/plugins/server/plugin_pipes.go @@ -5,9 +5,8 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/pkg/payload" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/server" @@ -16,16 +15,16 @@ import ( const ConfigSection = "server" const Response = "test" -var testPoolConfig = poolImpl.Config{ +var testPoolConfig = pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: &poolImpl.SupervisorConfig{ - WatchTick: 60, - TTL: 1000, - IdleTTL: 10, - ExecTTL: 10, + Supervisor: &pool.SupervisorConfig{ + WatchTick: 60 * time.Second, + TTL: 1000 * time.Second, + IdleTTL: 10 * time.Second, + ExecTTL: 10 * time.Second, MaxWorkerMemory: 1000, }, } @@ -80,11 +79,7 @@ func (f *Foo) Serve() chan error { } // test that our worker is functional - sw, err := worker.From(w) - if err != nil { - errCh <- err - return errCh - } + sw := worker.From(w) rsp, err := sw.Exec(r) if err != nil { diff --git a/tests/plugins/server/plugin_sockets.go b/tests/plugins/server/plugin_sockets.go index ede67ded..0b2857e3 100644 --- a/tests/plugins/server/plugin_sockets.go +++ b/tests/plugins/server/plugin_sockets.go @@ -4,8 +4,8 @@ import ( "context" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/server" @@ -60,11 +60,7 @@ func (f *Foo2) Serve() chan error { } // test that our worker is functional - sw, err := worker.From(w) - if err != nil { - errCh <- err - return errCh - } + sw := worker.From(w) rsp, err := sw.Exec(r) if err != nil { diff --git a/tests/plugins/server/plugin_tcp.go b/tests/plugins/server/plugin_tcp.go index 98c13b2b..ef4cea39 100644 --- a/tests/plugins/server/plugin_tcp.go +++ b/tests/plugins/server/plugin_tcp.go @@ -4,8 +4,8 @@ import ( "context" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/server" @@ -60,11 +60,7 @@ func (f *Foo3) Serve() chan error { } // test that our worker is functional - sw, err := worker.From(w) - if err != nil { - errCh <- err - return errCh - } + sw := worker.From(w) rsp, err := sw.Exec(r) if err != nil { diff --git a/tests/plugins/server/server_plugin_test.go b/tests/plugins/server/server_plugin_test.go index d63b0ccd..f600832a 100644 --- a/tests/plugins/server/server_plugin_test.go +++ b/tests/plugins/server/server_plugin_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" diff --git a/tests/plugins/static/configs/.rr-http-static-disabled.yaml b/tests/plugins/static/configs/.rr-http-static-disabled.yaml index e31baffc..9f04b8a9 100644 --- a/tests/plugins/static/configs/.rr-http-static-disabled.yaml +++ b/tests/plugins/static/configs/.rr-http-static-disabled.yaml @@ -11,7 +11,7 @@ http: debug: true address: 127.0.0.1:21234 max_request_size: 1024 - middleware: [ "gzip", "static" ] + middleware: [ "gzip" ] trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] uploads: forbid: [ ".php", ".exe", ".bat" ] @@ -19,10 +19,9 @@ http: dir: "abc" #not exists forbid: [ ".php", ".htaccess" ] request: - "Example-Request-Header": "Value" - # Automatically add headers to every response. + Example-Request-Header: "Value" response: - "X-Powered-By": "RoadRunner" + X-Powered-By: "RoadRunner" pool: num_workers: 2 max_jobs: 0 diff --git a/tests/plugins/static/configs/.rr-http-static-files-disable.yaml b/tests/plugins/static/configs/.rr-http-static-files-disable.yaml index deb408db..3d4d50b9 100644 --- a/tests/plugins/static/configs/.rr-http-static-files-disable.yaml +++ b/tests/plugins/static/configs/.rr-http-static-files-disable.yaml @@ -11,7 +11,7 @@ http: debug: true address: 127.0.0.1:45877 max_request_size: 1024 - middleware: [ "gzip", "static" ] + middleware: [ "gzip" ] trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] uploads: forbid: [ ".php", ".exe", ".bat" ] @@ -19,10 +19,10 @@ http: dir: "../../../tests" forbid: [ ".php" ] request: - "Example-Request-Header": "Value" + Example-Request-Header: "Value" # Automatically add headers to every response. response: - "X-Powered-By": "RoadRunner" + X-Powered-By: "RoadRunner" pool: num_workers: 2 max_jobs: 0 diff --git a/tests/plugins/static/static_plugin_test.go b/tests/plugins/static/static_plugin_test.go index 49721e44..d43ef765 100644 --- a/tests/plugins/static/static_plugin_test.go +++ b/tests/plugins/static/static_plugin_test.go @@ -13,7 +13,7 @@ import ( "time" "github.com/golang/mock/gomock" - "github.com/spiral/endure" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/gzip" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -209,8 +209,9 @@ func TestStaticDisabled(t *testing.T) { func staticDisabled(t *testing.T) { _, r, err := get("http://localhost:21234/sample.txt") //nolint:bodyclose - assert.Error(t, err) - assert.Nil(t, r) + assert.NoError(t, err) + assert.NotNil(t, r) + assert.Empty(t, r.Header.Get("X-Powered-By")) } func TestStaticFilesDisabled(t *testing.T) { diff --git a/tools/process.go b/tools/process.go index 50fe1616..a6eb1139 100644 --- a/tools/process.go +++ b/tools/process.go @@ -3,7 +3,7 @@ package tools import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" ) // ProcessState provides information about specific worker. @@ -15,7 +15,7 @@ type ProcessState struct { Status string `json:"status"` // Number of worker executions. - NumJobs int64 `json:"numExecs"` + NumJobs uint64 `json:"numExecs"` // Created is unix nano timestamp of worker creation time. Created int64 `json:"created"` diff --git a/tools/worker_table.go b/tools/worker_table.go index 4aeb6ae7..20b8084f 100644 --- a/tools/worker_table.go +++ b/tools/worker_table.go @@ -52,8 +52,9 @@ func renderStatus(status string) string { return status } -func renderJobs(number int64) string { - return humanize.Comma(number) +func renderJobs(number uint64) string { + // TODO overflow + return humanize.Comma(int64(number)) } func renderAlive(t time.Time) string { diff --git a/utils/network.go b/utils/network.go index c9db0e68..e57854a8 100755 --- a/utils/network.go +++ b/utils/network.go @@ -12,6 +12,14 @@ import ( "github.com/valyala/tcplisten" ) +// - SO_REUSEPORT. This option allows linear scaling server performance +// on multi-CPU servers. +// See https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/ for details. +// +// - TCP_DEFER_ACCEPT. This option expects the server reads from the accepted +// connection before writing to them. +// +// - TCP_FASTOPEN. See https://lwn.net/Articles/508865/ for details. // CreateListener crates socket listener based on DSN definition. func CreateListener(address string) (net.Listener, error) { dsn := strings.Split(address, "://") diff --git a/utils/network_test.go b/utils/network_test.go deleted file mode 100755 index cfed98f9..00000000 --- a/utils/network_test.go +++ /dev/null @@ -1,23 +0,0 @@ -// +build linux darwin freebsd - -package utils - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestCreateListener(t *testing.T) { - _, err := CreateListener("unexpected dsn") - assert.Error(t, err, "Invalid DSN (tcp://:6001, unix://file.sock)") - - _, err = CreateListener("aaa://192.168.0.1") - assert.Error(t, err, "Invalid Protocol (tcp://:6001, unix://file.sock)") -} - -func TestUnixCreateListener(t *testing.T) { - l, err := CreateListener("unix://file.sock") - assert.NoError(t, err) - l.Close() -} diff --git a/utils/network_windows.go b/utils/network_windows.go index a07ac351..6eefb8f7 100755 --- a/utils/network_windows.go +++ b/utils/network_windows.go @@ -8,8 +8,6 @@ import ( "os" "strings" "syscall" - - "github.com/valyala/tcplisten" ) // CreateListener crates socket listener based on DSN definition. @@ -47,13 +45,7 @@ func CreateListener(address string) (net.Listener, error) { } func createTCPListener(addr string) (net.Listener, error) { - cfg := tcplisten.Config{ - ReusePort: true, - DeferAccept: true, - FastOpen: true, - Backlog: 0, - } - listener, err := cfg.NewListener("tcp4", addr) + listener, err := net.Listen("tcp", addr) if err != nil { return nil, err } diff --git a/utils/network_windows_test.go b/utils/network_windows_test.go deleted file mode 100755 index 59ec0485..00000000 --- a/utils/network_windows_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// +build windows - -package utils - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestCreateListener(t *testing.T) { - _, err := CreateListener("unexpected dsn") - assert.Error(t, err, "Invalid DSN (tcp://:6001, unix://file.sock)") - - _, err = CreateListener("aaa://192.168.0.1") - assert.Error(t, err, "Invalid Protocol (tcp://:6001, unix://file.sock)") -} |