summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcomposer.json7
-rwxr-xr-xcomposer.lock389
-rwxr-xr-xgo.mod6
-rwxr-xr-xgo.sum5
-rwxr-xr-xpipe_factory.go3
-rw-r--r--plugins/http/attributes/attributes.go76
-rw-r--r--plugins/http/attributes/attributes_test.go79
-rw-r--r--plugins/http/config.go287
-rw-r--r--plugins/http/config_test.go330
-rw-r--r--plugins/http/constants.go6
-rw-r--r--plugins/http/errors.go25
-rw-r--r--plugins/http/errors_windows.go27
-rw-r--r--plugins/http/fcgi_test.go104
-rw-r--r--plugins/http/fixtures/server.crt15
-rw-r--r--plugins/http/fixtures/server.key9
-rw-r--r--plugins/http/h2c_test.go81
-rw-r--r--plugins/http/handler.go208
-rw-r--r--plugins/http/handler_test.go1962
-rw-r--r--plugins/http/parse.go147
-rw-r--r--plugins/http/parse_test.go52
-rw-r--r--plugins/http/plugin.go470
-rw-r--r--plugins/http/plugin_test.go759
-rw-r--r--plugins/http/request.go183
-rw-r--r--plugins/http/response.go105
-rw-r--r--plugins/http/response_test.go162
-rw-r--r--plugins/http/rpc.go34
-rw-r--r--plugins/http/rpc_test.go222
-rw-r--r--plugins/http/ssl_test.go255
-rw-r--r--plugins/http/test/.rr-http.yaml37
-rw-r--r--plugins/http/test/http_test.go72
-rw-r--r--plugins/http/test/psr-worker.php23
-rw-r--r--plugins/http/uploads.go160
-rw-r--r--plugins/http/uploads_config.go45
-rw-r--r--plugins/http/uploads_config_test.go24
-rw-r--r--plugins/http/uploads_test.go435
-rw-r--r--plugins/server/plugin.go78
-rw-r--r--src/Diactoros/ServerRequestFactory.php26
-rw-r--r--src/Diactoros/StreamFactory.php57
-rw-r--r--src/Diactoros/UploadedFileFactory.php36
-rw-r--r--[-rwxr-xr-x]src/Exception/MetricException.php0
-rw-r--r--[-rwxr-xr-x]src/Exception/RoadRunnerException.php0
-rw-r--r--src/Exceptions/RoadRunnerException.php18
-rw-r--r--src/HttpClient.php75
-rw-r--r--src/Metrics.php80
-rw-r--r--src/MetricsInterface.php64
-rw-r--r--src/PSR7Client.php217
-rw-r--r--[-rwxr-xr-x]src/Worker.php0
-rwxr-xr-xstatic_pool.go7
48 files changed, 7299 insertions, 163 deletions
diff --git a/composer.json b/composer.json
index aef75b08..283eaab1 100755
--- a/composer.json
+++ b/composer.json
@@ -18,14 +18,15 @@
"ext-json": "*",
"ext-curl": "*",
"spiral/goridge": "^2.4.2",
- "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0 || ^5.0.0"
+ "psr/http-factory": "^1.0",
+ "psr/http-message": "^1.0",
+ "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0 || ^5.0.0",
+ "laminas/laminas-diactoros": "^1.3 || ^2.0"
},
"config": {
"vendor-dir": "vendor_php"
},
"require-dev": {
- "psr/http-factory": "^1.0",
- "psr/http-message": "^1.0",
"phpstan/phpstan": "~0.12"
},
"scripts": {
diff --git a/composer.lock b/composer.lock
index 183f9fef..ded194f5 100755
--- a/composer.lock
+++ b/composer.lock
@@ -4,9 +4,168 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
- "content-hash": "95535b37e4eb6476a2f89ea1b0f16e48",
+ "content-hash": "439018483d4d3a37c3d369d2587b8311",
"packages": [
{
+ "name": "laminas/laminas-diactoros",
+ "version": "2.4.1",
+ "source": {
+ "type": "git",
+ "url": "https://github.com/laminas/laminas-diactoros.git",
+ "reference": "36ef09b73e884135d2059cc498c938e90821bb57"
+ },
+ "dist": {
+ "type": "zip",
+ "url": "https://api.github.com/repos/laminas/laminas-diactoros/zipball/36ef09b73e884135d2059cc498c938e90821bb57",
+ "reference": "36ef09b73e884135d2059cc498c938e90821bb57",
+ "shasum": ""
+ },
+ "require": {
+ "laminas/laminas-zendframework-bridge": "^1.0",
+ "php": "^7.1",
+ "psr/http-factory": "^1.0",
+ "psr/http-message": "^1.0"
+ },
+ "conflict": {
+ "phpspec/prophecy": "<1.9.0"
+ },
+ "provide": {
+ "psr/http-factory-implementation": "1.0",
+ "psr/http-message-implementation": "1.0"
+ },
+ "replace": {
+ "zendframework/zend-diactoros": "^2.2.1"
+ },
+ "require-dev": {
+ "ext-curl": "*",
+ "ext-dom": "*",
+ "ext-gd": "*",
+ "ext-libxml": "*",
+ "http-interop/http-factory-tests": "^0.5.0",
+ "laminas/laminas-coding-standard": "~1.0.0",
+ "php-http/psr7-integration-tests": "^1.0",
+ "phpunit/phpunit": "^7.5.18"
+ },
+ "type": "library",
+ "extra": {
+ "laminas": {
+ "config-provider": "Laminas\\Diactoros\\ConfigProvider",
+ "module": "Laminas\\Diactoros"
+ }
+ },
+ "autoload": {
+ "files": [
+ "src/functions/create_uploaded_file.php",
+ "src/functions/marshal_headers_from_sapi.php",
+ "src/functions/marshal_method_from_sapi.php",
+ "src/functions/marshal_protocol_version_from_sapi.php",
+ "src/functions/marshal_uri_from_sapi.php",
+ "src/functions/normalize_server.php",
+ "src/functions/normalize_uploaded_files.php",
+ "src/functions/parse_cookie_header.php",
+ "src/functions/create_uploaded_file.legacy.php",
+ "src/functions/marshal_headers_from_sapi.legacy.php",
+ "src/functions/marshal_method_from_sapi.legacy.php",
+ "src/functions/marshal_protocol_version_from_sapi.legacy.php",
+ "src/functions/marshal_uri_from_sapi.legacy.php",
+ "src/functions/normalize_server.legacy.php",
+ "src/functions/normalize_uploaded_files.legacy.php",
+ "src/functions/parse_cookie_header.legacy.php"
+ ],
+ "psr-4": {
+ "Laminas\\Diactoros\\": "src/"
+ }
+ },
+ "notification-url": "https://packagist.org/downloads/",
+ "license": [
+ "BSD-3-Clause"
+ ],
+ "description": "PSR HTTP Message implementations",
+ "homepage": "https://laminas.dev",
+ "keywords": [
+ "http",
+ "laminas",
+ "psr",
+ "psr-17",
+ "psr-7"
+ ],
+ "support": {
+ "chat": "https://laminas.dev/chat",
+ "docs": "https://docs.laminas.dev/laminas-diactoros/",
+ "forum": "https://discourse.laminas.dev",
+ "issues": "https://github.com/laminas/laminas-diactoros/issues",
+ "rss": "https://github.com/laminas/laminas-diactoros/releases.atom",
+ "source": "https://github.com/laminas/laminas-diactoros"
+ },
+ "funding": [
+ {
+ "url": "https://funding.communitybridge.org/projects/laminas-project",
+ "type": "community_bridge"
+ }
+ ],
+ "time": "2020-09-03T14:29:41+00:00"
+ },
+ {
+ "name": "laminas/laminas-zendframework-bridge",
+ "version": "1.1.1",
+ "source": {
+ "type": "git",
+ "url": "https://github.com/laminas/laminas-zendframework-bridge.git",
+ "reference": "6ede70583e101030bcace4dcddd648f760ddf642"
+ },
+ "dist": {
+ "type": "zip",
+ "url": "https://api.github.com/repos/laminas/laminas-zendframework-bridge/zipball/6ede70583e101030bcace4dcddd648f760ddf642",
+ "reference": "6ede70583e101030bcace4dcddd648f760ddf642",
+ "shasum": ""
+ },
+ "require": {
+ "php": "^5.6 || ^7.0 || ^8.0"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "^5.7 || ^6.5 || ^7.5 || ^8.1 || ^9.3",
+ "squizlabs/php_codesniffer": "^3.5"
+ },
+ "type": "library",
+ "extra": {
+ "laminas": {
+ "module": "Laminas\\ZendFrameworkBridge"
+ }
+ },
+ "autoload": {
+ "files": [
+ "src/autoload.php"
+ ],
+ "psr-4": {
+ "Laminas\\ZendFrameworkBridge\\": "src//"
+ }
+ },
+ "notification-url": "https://packagist.org/downloads/",
+ "license": [
+ "BSD-3-Clause"
+ ],
+ "description": "Alias legacy ZF class names to Laminas Project equivalents.",
+ "keywords": [
+ "ZendFramework",
+ "autoloading",
+ "laminas",
+ "zf"
+ ],
+ "support": {
+ "forum": "https://discourse.laminas.dev/",
+ "issues": "https://github.com/laminas/laminas-zendframework-bridge/issues",
+ "rss": "https://github.com/laminas/laminas-zendframework-bridge/releases.atom",
+ "source": "https://github.com/laminas/laminas-zendframework-bridge"
+ },
+ "funding": [
+ {
+ "url": "https://funding.communitybridge.org/projects/laminas-project",
+ "type": "community_bridge"
+ }
+ ],
+ "time": "2020-09-14T14:23:00+00:00"
+ },
+ {
"name": "psr/container",
"version": "1.0.0",
"source": {
@@ -60,6 +219,114 @@
"time": "2017-02-14T16:28:37+00:00"
},
{
+ "name": "psr/http-factory",
+ "version": "1.0.1",
+ "source": {
+ "type": "git",
+ "url": "https://github.com/php-fig/http-factory.git",
+ "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be"
+ },
+ "dist": {
+ "type": "zip",
+ "url": "https://api.github.com/repos/php-fig/http-factory/zipball/12ac7fcd07e5b077433f5f2bee95b3a771bf61be",
+ "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be",
+ "shasum": ""
+ },
+ "require": {
+ "php": ">=7.0.0",
+ "psr/http-message": "^1.0"
+ },
+ "type": "library",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "1.0.x-dev"
+ }
+ },
+ "autoload": {
+ "psr-4": {
+ "Psr\\Http\\Message\\": "src/"
+ }
+ },
+ "notification-url": "https://packagist.org/downloads/",
+ "license": [
+ "MIT"
+ ],
+ "authors": [
+ {
+ "name": "PHP-FIG",
+ "homepage": "http://www.php-fig.org/"
+ }
+ ],
+ "description": "Common interfaces for PSR-7 HTTP message factories",
+ "keywords": [
+ "factory",
+ "http",
+ "message",
+ "psr",
+ "psr-17",
+ "psr-7",
+ "request",
+ "response"
+ ],
+ "support": {
+ "source": "https://github.com/php-fig/http-factory/tree/master"
+ },
+ "time": "2019-04-30T12:38:16+00:00"
+ },
+ {
+ "name": "psr/http-message",
+ "version": "1.0.1",
+ "source": {
+ "type": "git",
+ "url": "https://github.com/php-fig/http-message.git",
+ "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363"
+ },
+ "dist": {
+ "type": "zip",
+ "url": "https://api.github.com/repos/php-fig/http-message/zipball/f6561bf28d520154e4b0ec72be95418abe6d9363",
+ "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363",
+ "shasum": ""
+ },
+ "require": {
+ "php": ">=5.3.0"
+ },
+ "type": "library",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "1.0.x-dev"
+ }
+ },
+ "autoload": {
+ "psr-4": {
+ "Psr\\Http\\Message\\": "src/"
+ }
+ },
+ "notification-url": "https://packagist.org/downloads/",
+ "license": [
+ "MIT"
+ ],
+ "authors": [
+ {
+ "name": "PHP-FIG",
+ "homepage": "http://www.php-fig.org/"
+ }
+ ],
+ "description": "Common interface for HTTP messages",
+ "homepage": "https://github.com/php-fig/http-message",
+ "keywords": [
+ "http",
+ "http-message",
+ "psr",
+ "psr-7",
+ "request",
+ "response"
+ ],
+ "support": {
+ "source": "https://github.com/php-fig/http-message/tree/master"
+ },
+ "time": "2016-08-06T14:39:51+00:00"
+ },
+ {
"name": "spiral/goridge",
"version": "v2.4.5",
"source": {
@@ -851,16 +1118,16 @@
"packages-dev": [
{
"name": "phpstan/phpstan",
- "version": "0.12.53",
+ "version": "0.12.56",
"source": {
"type": "git",
"url": "https://github.com/phpstan/phpstan.git",
- "reference": "dbbdb0d7c2434ecd5289f6114d16473e694caa67"
+ "reference": "007fd5d700c41e1bb27795fae15a2383f8fa4ba1"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/phpstan/phpstan/zipball/dbbdb0d7c2434ecd5289f6114d16473e694caa67",
- "reference": "dbbdb0d7c2434ecd5289f6114d16473e694caa67",
+ "url": "https://api.github.com/repos/phpstan/phpstan/zipball/007fd5d700c41e1bb27795fae15a2383f8fa4ba1",
+ "reference": "007fd5d700c41e1bb27795fae15a2383f8fa4ba1",
"shasum": ""
},
"require": {
@@ -891,7 +1158,7 @@
"description": "PHPStan - PHP Static Analysis Tool",
"support": {
"issues": "https://github.com/phpstan/phpstan/issues",
- "source": "https://github.com/phpstan/phpstan/tree/0.12.53"
+ "source": "https://github.com/phpstan/phpstan/tree/0.12.56"
},
"funding": [
{
@@ -907,115 +1174,7 @@
"type": "tidelift"
}
],
- "time": "2020-11-01T14:51:50+00:00"
- },
- {
- "name": "psr/http-factory",
- "version": "1.0.1",
- "source": {
- "type": "git",
- "url": "https://github.com/php-fig/http-factory.git",
- "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be"
- },
- "dist": {
- "type": "zip",
- "url": "https://api.github.com/repos/php-fig/http-factory/zipball/12ac7fcd07e5b077433f5f2bee95b3a771bf61be",
- "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be",
- "shasum": ""
- },
- "require": {
- "php": ">=7.0.0",
- "psr/http-message": "^1.0"
- },
- "type": "library",
- "extra": {
- "branch-alias": {
- "dev-master": "1.0.x-dev"
- }
- },
- "autoload": {
- "psr-4": {
- "Psr\\Http\\Message\\": "src/"
- }
- },
- "notification-url": "https://packagist.org/downloads/",
- "license": [
- "MIT"
- ],
- "authors": [
- {
- "name": "PHP-FIG",
- "homepage": "http://www.php-fig.org/"
- }
- ],
- "description": "Common interfaces for PSR-7 HTTP message factories",
- "keywords": [
- "factory",
- "http",
- "message",
- "psr",
- "psr-17",
- "psr-7",
- "request",
- "response"
- ],
- "support": {
- "source": "https://github.com/php-fig/http-factory/tree/master"
- },
- "time": "2019-04-30T12:38:16+00:00"
- },
- {
- "name": "psr/http-message",
- "version": "1.0.1",
- "source": {
- "type": "git",
- "url": "https://github.com/php-fig/http-message.git",
- "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363"
- },
- "dist": {
- "type": "zip",
- "url": "https://api.github.com/repos/php-fig/http-message/zipball/f6561bf28d520154e4b0ec72be95418abe6d9363",
- "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363",
- "shasum": ""
- },
- "require": {
- "php": ">=5.3.0"
- },
- "type": "library",
- "extra": {
- "branch-alias": {
- "dev-master": "1.0.x-dev"
- }
- },
- "autoload": {
- "psr-4": {
- "Psr\\Http\\Message\\": "src/"
- }
- },
- "notification-url": "https://packagist.org/downloads/",
- "license": [
- "MIT"
- ],
- "authors": [
- {
- "name": "PHP-FIG",
- "homepage": "http://www.php-fig.org/"
- }
- ],
- "description": "Common interface for HTTP messages",
- "homepage": "https://github.com/php-fig/http-message",
- "keywords": [
- "http",
- "http-message",
- "psr",
- "psr-7",
- "request",
- "response"
- ],
- "support": {
- "source": "https://github.com/php-fig/http-message/tree/master"
- },
- "time": "2016-08-06T14:39:51+00:00"
+ "time": "2020-11-16T22:59:18+00:00"
}
],
"aliases": [],
diff --git a/go.mod b/go.mod
index 21500c30..e0d0305b 100755
--- a/go.mod
+++ b/go.mod
@@ -3,11 +3,14 @@ module github.com/spiral/roadrunner/v2
go 1.15
require (
+ github.com/cenkalti/backoff/v4 v4.1.0
github.com/fatih/color v1.10.0
+ github.com/hashicorp/go-multierror v1.0.0
github.com/json-iterator/go v1.1.10
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/shirou/gopsutil v3.20.10+incompatible
+ github.com/sirupsen/logrus v1.6.0
github.com/spf13/viper v1.7.1
github.com/spiral/endure v1.0.0-beta19
github.com/spiral/errors v1.0.4
@@ -16,6 +19,7 @@ require (
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
+ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1
-) \ No newline at end of file
+)
diff --git a/go.sum b/go.sum
index 343f4fcf..98942460 100755
--- a/go.sum
+++ b/go.sum
@@ -115,10 +115,12 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
+github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
+github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
@@ -236,6 +238,7 @@ github.com/shirou/gopsutil v3.20.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMT
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
@@ -341,7 +344,9 @@ golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200222125558-5a598a2470a0 h1:MsuvTghUPjX762sGLnGsxC3HM0B5r83wEtYcYR8/vRs=
golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
diff --git a/pipe_factory.go b/pipe_factory.go
index 15f38e42..76a3780e 100755
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -11,8 +11,7 @@ import (
// PipeFactory connects to stack using standard
// streams (STDIN, STDOUT pipes).
-type PipeFactory struct {
-}
+type PipeFactory struct{}
// NewPipeFactory returns new factory instance and starts
// listening
diff --git a/plugins/http/attributes/attributes.go b/plugins/http/attributes/attributes.go
new file mode 100644
index 00000000..77d6ea69
--- /dev/null
+++ b/plugins/http/attributes/attributes.go
@@ -0,0 +1,76 @@
+package attributes
+
+import (
+ "context"
+ "errors"
+ "net/http"
+)
+
+type attrKey int
+
+const contextKey attrKey = iota
+
+type attrs map[string]interface{}
+
+func (v attrs) get(key string) interface{} {
+ if v == nil {
+ return ""
+ }
+
+ return v[key]
+}
+
+func (v attrs) set(key string, value interface{}) {
+ v[key] = value
+}
+
+func (v attrs) del(key string) {
+ delete(v, key)
+}
+
+// Init returns request with new context and attribute bag.
+func Init(r *http.Request) *http.Request {
+ return r.WithContext(context.WithValue(r.Context(), contextKey, attrs{}))
+}
+
+// All returns all context attributes.
+func All(r *http.Request) map[string]interface{} {
+ v := r.Context().Value(contextKey)
+ if v == nil {
+ return attrs{}
+ }
+
+ return v.(attrs)
+}
+
+// Get gets the value from request context. It replaces any existing
+// values.
+func Get(r *http.Request, key string) interface{} {
+ v := r.Context().Value(contextKey)
+ if v == nil {
+ return nil
+ }
+
+ return v.(attrs).get(key)
+}
+
+// Set sets the key to value. It replaces any existing
+// values. Context specific.
+func Set(r *http.Request, key string, value interface{}) error {
+ v := r.Context().Value(contextKey)
+ if v == nil {
+ return errors.New("unable to find `psr:attributes` context key")
+ }
+
+ v.(attrs).set(key, value)
+ return nil
+}
+
+// Delete deletes values associated with attribute key.
+func (v attrs) Delete(key string) {
+ if v == nil {
+ return
+ }
+
+ v.del(key)
+}
diff --git a/plugins/http/attributes/attributes_test.go b/plugins/http/attributes/attributes_test.go
new file mode 100644
index 00000000..a4c85eea
--- /dev/null
+++ b/plugins/http/attributes/attributes_test.go
@@ -0,0 +1,79 @@
+package attributes
+
+import (
+ "net/http"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestAllAttributes(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
+
+ assert.Equal(t, All(r), map[string]interface{}{
+ "key": "value",
+ })
+}
+
+func TestAllAttributesNone(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ assert.Equal(t, All(r), map[string]interface{}{})
+}
+
+func TestAllAttributesNone2(t *testing.T) {
+ r := &http.Request{}
+
+ assert.Equal(t, All(r), map[string]interface{}{})
+}
+
+func TestGetAttribute(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
+ assert.Equal(t, Get(r, "key"), "value")
+}
+
+func TestGetAttributeNone(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ assert.Equal(t, Get(r, "key"), nil)
+}
+
+func TestGetAttributeNone2(t *testing.T) {
+ r := &http.Request{}
+
+ assert.Equal(t, Get(r, "key"), nil)
+}
+
+func TestSetAttribute(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
+ assert.Equal(t, Get(r, "key"), "value")
+}
+
+func TestSetAttributeNone(t *testing.T) {
+ r := &http.Request{}
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
+ assert.Equal(t, Get(r, "key"), nil)
+}
diff --git a/plugins/http/config.go b/plugins/http/config.go
new file mode 100644
index 00000000..b3f4ca13
--- /dev/null
+++ b/plugins/http/config.go
@@ -0,0 +1,287 @@
+package http
+
+import (
+ "errors"
+ "fmt"
+ "net"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/spiral/roadrunner/v2"
+)
+
+type ServerConfig struct {
+ // Command includes command strings with all the parameters, example: "php worker.php pipes".
+ Command string
+
+ // User under which process will be started
+ User string
+
+ // Relay defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://pool.sock"
+ // This config section must not change on re-configuration.
+ Relay string
+
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration.
+ RelayTimeout time.Duration
+
+ // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change
+ // while server is running.
+
+
+ env map[string]string
+}
+
+// Config configures RoadRunner HTTP server.
+type Config struct {
+ // Port and port to handle as http server.
+ Address string
+
+ // SSL defines https server options.
+ SSL SSLConfig
+
+ // FCGI configuration. You can use FastCGI without HTTP server.
+ FCGI *FCGIConfig
+
+ // HTTP2 configuration
+ HTTP2 *HTTP2Config
+
+ // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited.
+ MaxRequestSize int64
+
+ // TrustedSubnets declare IP subnets which are allowed to set ip using X-Real-Ip and X-Forwarded-For
+ TrustedSubnets []string
+ cidrs []*net.IPNet
+
+ // Uploads configures uploads configuration.
+ Uploads *UploadsConfig
+
+ // Pool configures worker pool.
+ Pool *roadrunner.PoolConfig
+}
+
+// FCGIConfig for FastCGI server.
+type FCGIConfig struct {
+ // Address and port to handle as http server.
+ Address string
+}
+
+// HTTP2Config HTTP/2 server customizations.
+type HTTP2Config struct {
+ // Enable or disable HTTP/2 extension, default enable.
+ Enabled bool
+
+ // H2C enables HTTP/2 over TCP
+ H2C bool
+
+ // MaxConcurrentStreams defaults to 128.
+ MaxConcurrentStreams uint32
+}
+
+// InitDefaults sets default values for HTTP/2 configuration.
+func (cfg *HTTP2Config) InitDefaults() error {
+ cfg.Enabled = true
+ cfg.MaxConcurrentStreams = 128
+
+ return nil
+}
+
+// SSLConfig defines https server configuration.
+type SSLConfig struct {
+ // Port to listen as HTTPS server, defaults to 443.
+ Port int
+
+ // Redirect when enabled forces all http connections to switch to https.
+ Redirect bool
+
+ // Key defined private server key.
+ Key string
+
+ // Cert is https certificate.
+ Cert string
+
+ // Root CA file
+ RootCA string
+}
+
+// EnableHTTP is true when http server must run.
+func (c *Config) EnableHTTP() bool {
+ return c.Address != ""
+}
+
+// EnableTLS returns true if pool must listen TLS connections.
+func (c *Config) EnableTLS() bool {
+ return c.SSL.Key != "" || c.SSL.Cert != "" || c.SSL.RootCA != ""
+}
+
+// EnableHTTP2 when HTTP/2 extension must be enabled (only with TSL).
+func (c *Config) EnableHTTP2() bool {
+ return c.HTTP2.Enabled
+}
+
+// EnableH2C when HTTP/2 extension must be enabled on TCP.
+func (c *Config) EnableH2C() bool {
+ return c.HTTP2.H2C
+}
+
+// EnableFCGI is true when FastCGI server must be enabled.
+func (c *Config) EnableFCGI() bool {
+ return c.FCGI.Address != ""
+}
+
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg Config) error {
+ //if c.Workers == nil {
+ // c.Workers = &ServerConfig{}
+ //}
+
+ if c.HTTP2 == nil {
+ c.HTTP2 = &HTTP2Config{}
+ }
+
+ if c.FCGI == nil {
+ c.FCGI = &FCGIConfig{}
+ }
+
+ if c.Uploads == nil {
+ c.Uploads = &UploadsConfig{}
+ }
+
+ if c.SSL.Port == 0 {
+ c.SSL.Port = 443
+ }
+
+ err := c.HTTP2.InitDefaults()
+ if err != nil {
+ return err
+ }
+ err = c.Uploads.InitDefaults()
+ if err != nil {
+ return err
+ }
+ //err = c.Workers.InitDefaults()
+ //if err != nil {
+ // return err
+ //}
+ //
+ //if err := cfg.Unmarshal(c); err != nil {
+ // return err
+ //}
+ //
+ //c.Workers.UpscaleDurations()
+
+ if c.TrustedSubnets == nil {
+ // @see https://en.wikipedia.org/wiki/Reserved_IP_addresses
+ c.TrustedSubnets = []string{
+ "10.0.0.0/8",
+ "127.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "::1/128",
+ "fc00::/7",
+ "fe80::/10",
+ }
+ }
+
+ if err := c.parseCIDRs(); err != nil {
+ return err
+ }
+
+ return c.Valid()
+}
+
+func (c *Config) parseCIDRs() error {
+ for _, cidr := range c.TrustedSubnets {
+ _, cr, err := net.ParseCIDR(cidr)
+ if err != nil {
+ return err
+ }
+
+ c.cidrs = append(c.cidrs, cr)
+ }
+
+ return nil
+}
+
+// IsTrusted if api can be trusted to use X-Real-Ip, X-Forwarded-For
+func (c *Config) IsTrusted(ip string) bool {
+ if c.cidrs == nil {
+ return false
+ }
+
+ i := net.ParseIP(ip)
+ if i == nil {
+ return false
+ }
+
+ for _, cird := range c.cidrs {
+ if cird.Contains(i) {
+ return true
+ }
+ }
+
+ return false
+}
+
+// Valid validates the configuration.
+func (c *Config) Valid() error {
+ if c.Uploads == nil {
+ return errors.New("malformed uploads config")
+ }
+
+ if c.HTTP2 == nil {
+ return errors.New("malformed http2 config")
+ }
+
+ //if c.Workers == nil {
+ // return errors.New("malformed workers config")
+ //}
+ //
+ //if c.Workers.Pool == nil {
+ // return errors.New("malformed workers config (pool config is missing)")
+ //}
+
+ //if err := c.Workers.Pool.Valid(); err != nil {
+ // return err
+ //}
+
+ if !c.EnableHTTP() && !c.EnableTLS() && !c.EnableFCGI() {
+ return errors.New("unable to run http service, no method has been specified (http, https, http/2 or FastCGI)")
+ }
+
+ if c.Address != "" && !strings.Contains(c.Address, ":") {
+ return errors.New("malformed http server address")
+ }
+
+ if c.EnableTLS() {
+ if _, err := os.Stat(c.SSL.Key); err != nil {
+ if os.IsNotExist(err) {
+ return fmt.Errorf("key file '%s' does not exists", c.SSL.Key)
+ }
+
+ return err
+ }
+
+ if _, err := os.Stat(c.SSL.Cert); err != nil {
+ if os.IsNotExist(err) {
+ return fmt.Errorf("cert file '%s' does not exists", c.SSL.Cert)
+ }
+
+ return err
+ }
+
+ // RootCA is optional, but if provided - check it
+ if c.SSL.RootCA != "" {
+ if _, err := os.Stat(c.SSL.RootCA); err != nil {
+ if os.IsNotExist(err) {
+ return fmt.Errorf("root ca path provided, but path '%s' does not exists", c.SSL.RootCA)
+ }
+ return err
+ }
+ }
+ }
+
+ return nil
+}
diff --git a/plugins/http/config_test.go b/plugins/http/config_test.go
new file mode 100644
index 00000000..6d23b6ca
--- /dev/null
+++ b/plugins/http/config_test.go
@@ -0,0 +1,330 @@
+package http
+
+//import (
+// "os"
+// "testing"
+// "time"
+//
+// json "github.com/json-iterator/go"
+// "github.com/stretchr/testify/assert"
+//)
+//
+//type mockCfg struct{ cfg string }
+//
+//func (cfg *mockCfg) Get(name string) service.Config { return nil }
+//func (cfg *mockCfg) Unmarshal(out interface{}) error {
+// j := json.ConfigCompatibleWithStandardLibrary
+// return j.Unmarshal([]byte(cfg.cfg), out)
+//}
+//
+//func Test_Config_Hydrate_Error1(t *testing.T) {
+// cfg := &mockCfg{`{"address": "localhost:8080"}`}
+// c := &Config{}
+//
+// assert.NoError(t, c.Hydrate(cfg))
+//}
+//
+//func Test_Config_Hydrate_Error2(t *testing.T) {
+// cfg := &mockCfg{`{"dir": "/dir/"`}
+// c := &Config{}
+//
+// assert.Error(t, c.Hydrate(cfg))
+//}
+//
+//func Test_Config_Valid(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// MaxRequestSize: 1024,
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// },
+// }
+//
+// assert.NoError(t, cfg.Valid())
+//}
+//
+//func Test_Trusted_Subnets(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// TrustedSubnets: []string{"200.1.0.0/16"},
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// },
+// }
+//
+// assert.NoError(t, cfg.parseCIDRs())
+//
+// assert.True(t, cfg.IsTrusted("200.1.0.10"))
+// assert.False(t, cfg.IsTrusted("127.0.0.0.1"))
+//}
+//
+//func Test_Trusted_Subnets_Err(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// TrustedSubnets: []string{"200.1.0.0"},
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// },
+// }
+//
+// assert.Error(t, cfg.parseCIDRs())
+//}
+//
+//func Test_Config_Valid_SSL(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// SSL: SSLConfig{
+// Cert: "fixtures/server.crt",
+// Key: "fixtures/server.key",
+// },
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// },
+// }
+//
+// assert.Error(t, cfg.Hydrate(&testCfg{httpCfg: "{}"}))
+//
+// assert.NoError(t, cfg.Valid())
+// assert.True(t, cfg.EnableTLS())
+// assert.Equal(t, 443, cfg.SSL.Port)
+//}
+//
+//func Test_Config_SSL_No_key(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// SSL: SSLConfig{
+// Cert: "fixtures/server.crt",
+// },
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// },
+// }
+//
+// assert.Error(t, cfg.Valid())
+//}
+//
+//func Test_Config_SSL_No_Cert(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// SSL: SSLConfig{
+// Key: "fixtures/server.key",
+// },
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// },
+// }
+//
+// assert.Error(t, cfg.Valid())
+//}
+//
+//func Test_Config_NoUploads(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// MaxRequestSize: 1024,
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// },
+// }
+//
+// assert.Error(t, cfg.Valid())
+//}
+//
+//func Test_Config_NoHTTP2(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 0,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// },
+// }
+//
+// assert.Error(t, cfg.Valid())
+//}
+//
+//func Test_Config_NoWorkers(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// MaxRequestSize: 1024,
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// }
+//
+// assert.Error(t, cfg.Valid())
+//}
+//
+//func Test_Config_NoPool(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 0,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// },
+// }
+//
+// assert.Error(t, cfg.Valid())
+//}
+//
+//func Test_Config_DeadPool(t *testing.T) {
+// cfg := &Config{
+// Address: ":8080",
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// },
+// }
+//
+// assert.Error(t, cfg.Valid())
+//}
+//
+//func Test_Config_InvalidAddress(t *testing.T) {
+// cfg := &Config{
+// Address: "unexpected_address",
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// HTTP2: &HTTP2Config{
+// Enabled: true,
+// },
+// Workers: &roadrunner.ServerConfig{
+// Command: "php tests/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// },
+// }
+//
+// assert.Error(t, cfg.Valid())
+//}
diff --git a/plugins/http/constants.go b/plugins/http/constants.go
new file mode 100644
index 00000000..a25f52a4
--- /dev/null
+++ b/plugins/http/constants.go
@@ -0,0 +1,6 @@
+package http
+
+import "net/http"
+
+var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push")
+var trailerHeaderKey = http.CanonicalHeaderKey("trailer")
diff --git a/plugins/http/errors.go b/plugins/http/errors.go
new file mode 100644
index 00000000..fb8762ef
--- /dev/null
+++ b/plugins/http/errors.go
@@ -0,0 +1,25 @@
+// +build !windows
+
+package http
+
+import (
+ "errors"
+ "net"
+ "os"
+ "syscall"
+)
+
+// Broken pipe
+var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer")
+
+// handleWriteError just check if error was caused by aborted connection on linux
+func handleWriteError(err error) error {
+ if netErr, ok2 := err.(*net.OpError); ok2 {
+ if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
+ if syscallErr.Err == syscall.EPIPE {
+ return errEPIPE
+ }
+ }
+ }
+ return err
+}
diff --git a/plugins/http/errors_windows.go b/plugins/http/errors_windows.go
new file mode 100644
index 00000000..3d0ba04c
--- /dev/null
+++ b/plugins/http/errors_windows.go
@@ -0,0 +1,27 @@
+// +build windows
+
+package http
+
+import (
+ "errors"
+ "net"
+ "os"
+ "syscall"
+)
+
+//Software caused connection abort.
+//An established connection was aborted by the software in your host computer,
+//possibly due to a data transmission time-out or protocol error.
+var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer")
+
+// handleWriteError just check if error was caused by aborted connection on windows
+func handleWriteError(err error) error {
+ if netErr, ok2 := err.(*net.OpError); ok2 {
+ if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
+ if syscallErr.Err == syscall.WSAECONNABORTED {
+ return errEPIPE
+ }
+ }
+ }
+ return err
+}
diff --git a/plugins/http/fcgi_test.go b/plugins/http/fcgi_test.go
new file mode 100644
index 00000000..82b7d1c4
--- /dev/null
+++ b/plugins/http/fcgi_test.go
@@ -0,0 +1,104 @@
+package http
+
+//
+//import (
+// "io/ioutil"
+// "net/http/fcgi"
+// "net/http/httptest"
+// "testing"
+// "time"
+//
+// "github.com/stretchr/testify/assert"
+//)
+//
+//func Test_FCGI_Service_Echo(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+// "fcgi": {
+// "address": "tcp://0.0.0.0:6082"
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "pool": {"numWorkers": 1}
+// }
+// }`}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() { assert.NoError(t, c.Serve()) }()
+// time.Sleep(time.Second * 1)
+//
+// fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6082")
+//
+// fcgiHandler := gofast.NewHandler(
+// gofast.BasicParamsMap(gofast.BasicSession),
+// gofast.SimpleClientFactory(fcgiConnFactory, 0),
+// )
+//
+// w := httptest.NewRecorder()
+// req := httptest.NewRequest("GET", "http://site.local/?hello=world", nil)
+// fcgiHandler.ServeHTTP(w, req)
+//
+// body, err := ioutil.ReadAll(w.Result().Body)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 201, w.Result().StatusCode)
+// assert.Equal(t, "WORLD", string(body))
+// c.Stop()
+//}
+//
+//func Test_FCGI_Service_Request_Uri(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+// "fcgi": {
+// "address": "tcp://0.0.0.0:6083"
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php request-uri pipes",
+// "pool": {"numWorkers": 1}
+// }
+// }`}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() { assert.NoError(t, c.Serve()) }()
+// time.Sleep(time.Second * 1)
+//
+// fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6083")
+//
+// fcgiHandler := gofast.NewHandler(
+// gofast.BasicParamsMap(gofast.BasicSession),
+// gofast.SimpleClientFactory(fcgiConnFactory, 0),
+// )
+//
+// w := httptest.NewRecorder()
+// req := httptest.NewRequest("GET", "http://site.local/hello-world", nil)
+// fcgiHandler.ServeHTTP(w, req)
+//
+// body, err := ioutil.ReadAll(w.Result().Body)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, w.Result().StatusCode)
+// assert.Equal(t, "http://site.local/hello-world", string(body))
+// c.Stop()
+//}
diff --git a/plugins/http/fixtures/server.crt b/plugins/http/fixtures/server.crt
new file mode 100644
index 00000000..24d67fd7
--- /dev/null
+++ b/plugins/http/fixtures/server.crt
@@ -0,0 +1,15 @@
+-----BEGIN CERTIFICATE-----
+MIICTTCCAdOgAwIBAgIJAOKyUd+llTRKMAoGCCqGSM49BAMCMGMxCzAJBgNVBAYT
+AlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2Nv
+MRMwEQYDVQQKDApSb2FkUnVubmVyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMTgw
+OTMwMTMzNDUzWhcNMjgwOTI3MTMzNDUzWjBjMQswCQYDVQQGEwJVUzETMBEGA1UE
+CAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzETMBEGA1UECgwK
+Um9hZFJ1bm5lcjESMBAGA1UEAwwJbG9jYWxob3N0MHYwEAYHKoZIzj0CAQYFK4EE
+ACIDYgAEVnbShsM+l5RR3wfWWmGhzuFGwNzKCk7i9xyobDIyBUxG/UUSfj7KKlUX
+puDnDEtF5xXcepl744CyIAYFLOXHb5WqI4jCOzG0o9f/00QQ4bQudJOdbqV910QF
+C2vb7Fxro1MwUTAdBgNVHQ4EFgQU9xUexnbB6ORKayA7Pfjzs33otsAwHwYDVR0j
+BBgwFoAU9xUexnbB6ORKayA7Pfjzs33otsAwDwYDVR0TAQH/BAUwAwEB/zAKBggq
+hkjOPQQDAgNoADBlAjEAue3HhR/MUhxoa9tSDBtOJT3FYbDQswrsdqBTz97CGKst
+e7XeZ3HMEvEXy0hGGEMhAjAqcD/4k9vViVppgWFtkk6+NFbm+Kw/QeeAiH5FgFSj
+8xQcb+b7nPwNLp3JOkXkVd4=
+-----END CERTIFICATE-----
diff --git a/plugins/http/fixtures/server.key b/plugins/http/fixtures/server.key
new file mode 100644
index 00000000..7501dd46
--- /dev/null
+++ b/plugins/http/fixtures/server.key
@@ -0,0 +1,9 @@
+-----BEGIN EC PARAMETERS-----
+BgUrgQQAIg==
+-----END EC PARAMETERS-----
+-----BEGIN EC PRIVATE KEY-----
+MIGkAgEBBDCQP8utxNbHR6xZOLAJgUhn88r6IrPqmN0MsgGJM/jePB+T9UhkmIU8
+PMm2HeScbcugBwYFK4EEACKhZANiAARWdtKGwz6XlFHfB9ZaYaHO4UbA3MoKTuL3
+HKhsMjIFTEb9RRJ+PsoqVRem4OcMS0XnFdx6mXvjgLIgBgUs5cdvlaojiMI7MbSj
+1//TRBDhtC50k51upX3XRAULa9vsXGs=
+-----END EC PRIVATE KEY-----
diff --git a/plugins/http/h2c_test.go b/plugins/http/h2c_test.go
new file mode 100644
index 00000000..936ca8eb
--- /dev/null
+++ b/plugins/http/h2c_test.go
@@ -0,0 +1,81 @@
+package http
+
+//
+//import (
+// "net/http"
+// "testing"
+// "time"
+//
+// "github.com/cenkalti/backoff/v4"
+// "github.com/stretchr/testify/assert"
+//)
+//
+//func Test_Service_H2C(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "address": ":6029",
+// "http2": {"h2c":true},
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error serving: %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 100)
+// defer c.Stop()
+//
+// req, err := http.NewRequest("PRI", "http://localhost:6029?hello=world", nil)
+// if err != nil {
+// return err
+// }
+//
+// req.Header.Add("Upgrade", "h2c")
+// req.Header.Add("Connection", "HTTP2-Settings")
+// req.Header.Add("HTTP2-Settings", "")
+//
+// r, err2 := http.DefaultClient.Do(req)
+// if err2 != nil {
+// return err2
+// }
+//
+// assert.Equal(t, "101 Switching Protocols", r.Status)
+//
+// err3 := r.Body.Close()
+// if err3 != nil {
+// return err3
+// }
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
diff --git a/plugins/http/handler.go b/plugins/http/handler.go
new file mode 100644
index 00000000..5b612d7e
--- /dev/null
+++ b/plugins/http/handler.go
@@ -0,0 +1,208 @@
+package http
+
+import (
+ "fmt"
+ "net"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/pkg/errors"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+)
+
+const (
+ // EventResponse thrown after the request been processed. See ErrorEvent as payload.
+ EventResponse = iota + 500
+
+ // EventError thrown on any non job error provided by road runner server.
+ EventError
+)
+
+// ErrorEvent represents singular http error event.
+type ErrorEvent struct {
+ // Request contains client request, must not be stored.
+ Request *http.Request
+
+ // Error - associated error, if any.
+ Error error
+
+ // event timings
+ start time.Time
+ elapsed time.Duration
+}
+
+// Elapsed returns duration of the invocation.
+func (e *ErrorEvent) Elapsed() time.Duration {
+ return e.elapsed
+}
+
+// ResponseEvent represents singular http response event.
+type ResponseEvent struct {
+ // Request contains client request, must not be stored.
+ Request *Request
+
+ // Response contains service response.
+ Response *Response
+
+ // event timings
+ start time.Time
+ elapsed time.Duration
+}
+
+// Elapsed returns duration of the invocation.
+func (e *ResponseEvent) Elapsed() time.Duration {
+ return e.elapsed
+}
+
+// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
+// parsed files and query, payload will include parsed form dataTree (if any).
+type Handler struct {
+ cfg *Config
+ log log.Logger
+ rr roadrunner.Pool
+ mul sync.Mutex
+ lsn func(event int, ctx interface{})
+}
+
+// Listen attaches handler event controller.
+func (h *Handler) Listen(l func(event int, ctx interface{})) {
+ h.mul.Lock()
+ defer h.mul.Unlock()
+
+ h.lsn = l
+}
+
+// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
+func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ start := time.Now()
+
+ // validating request size
+ if h.cfg.MaxRequestSize != 0 {
+ if length := r.Header.Get("content-length"); length != "" {
+ if size, err := strconv.ParseInt(length, 10, 64); err != nil {
+ h.handleError(w, r, err, start)
+ return
+ } else if size > h.cfg.MaxRequestSize*1024*1024 {
+ h.handleError(w, r, errors.New("request body max size is exceeded"), start)
+ return
+ }
+ }
+ }
+
+ req, err := NewRequest(r, h.cfg.Uploads)
+ if err != nil {
+ h.handleError(w, r, err, start)
+ return
+ }
+
+ // proxy IP resolution
+ h.resolveIP(req)
+
+ req.Open(h.log)
+ defer req.Close(h.log)
+
+ p, err := req.Payload()
+ if err != nil {
+ h.handleError(w, r, err, start)
+ return
+ }
+
+ rsp, err := h.rr.Exec(p)
+ if err != nil {
+ h.handleError(w, r, err, start)
+ return
+ }
+
+ resp, err := NewResponse(rsp)
+ if err != nil {
+ h.handleError(w, r, err, start)
+ return
+ }
+
+ h.handleResponse(req, resp, start)
+ err = resp.Write(w)
+ if err != nil {
+ h.handleError(w, r, err, start)
+ }
+}
+
+// handleError sends error.
+func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error, start time.Time) {
+ // if pipe is broken, there is no sense to write the header
+ // in this case we just report about error
+ if err == errEPIPE {
+ h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
+ return
+ }
+ // ResponseWriter is ok, write the error code
+ w.WriteHeader(500)
+ _, err2 := w.Write([]byte(err.Error()))
+ // error during the writing to the ResponseWriter
+ if err2 != nil {
+ // concat original error with ResponseWriter error
+ h.throw(EventError, &ErrorEvent{Request: r, Error: errors.New(fmt.Sprintf("error: %v, during handle this error, ResponseWriter error occurred: %v", err, err2)), start: start, elapsed: time.Since(start)})
+ return
+ }
+ h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
+}
+
+// handleResponse triggers response event.
+func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) {
+ h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)})
+}
+
+// throw invokes event handler if any.
+func (h *Handler) throw(event int, ctx interface{}) {
+ h.mul.Lock()
+ defer h.mul.Unlock()
+
+ if h.lsn != nil {
+ h.lsn(event, ctx)
+ }
+}
+
+// get real ip passing multiple proxy
+func (h *Handler) resolveIP(r *Request) {
+ if !h.cfg.IsTrusted(r.RemoteAddr) {
+ return
+ }
+
+ if r.Header.Get("X-Forwarded-For") != "" {
+ ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",")
+ ipCount := len(ips)
+
+ for i := ipCount - 1; i >= 0; i-- {
+ addr := strings.TrimSpace(ips[i])
+ if net.ParseIP(addr) != nil {
+ r.RemoteAddr = addr
+ return
+ }
+ }
+
+ return
+ }
+
+ // The logic here is the following:
+ // In general case, we only expect X-Real-Ip header. If it exist, we get the IP addres from header and set request Remote address
+ // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers
+ // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF.
+ // CF-Connecting-IP is an Enterprise feature and we check it last in order.
+ // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string
+ if r.Header.Get("X-Real-Ip") != "" {
+ r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip"))
+ return
+ }
+
+ if r.Header.Get("True-Client-IP") != "" {
+ r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP"))
+ return
+ }
+
+ if r.Header.Get("CF-Connecting-IP") != "" {
+ r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP"))
+ }
+}
diff --git a/plugins/http/handler_test.go b/plugins/http/handler_test.go
new file mode 100644
index 00000000..d15cf96f
--- /dev/null
+++ b/plugins/http/handler_test.go
@@ -0,0 +1,1962 @@
+package http
+
+//
+//import (
+// "bytes"
+// "context"
+// "github.com/spiral/roadrunner"
+// "github.com/stretchr/testify/assert"
+// "io/ioutil"
+// "mime/multipart"
+// "net/http"
+// "net/http/httptest"
+// "net/url"
+// "os"
+// "runtime"
+// "strings"
+// "testing"
+// "time"
+//)
+//
+//// get request and return body
+//func get(url string) (string, *http.Response, error) {
+// r, err := http.Get(url)
+// if err != nil {
+// return "", nil, err
+// }
+// b, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// return "", nil, err
+// }
+//
+// err = r.Body.Close()
+// if err != nil {
+// return "", nil, err
+// }
+// return string(b), r, err
+//}
+//
+//// get request and return body
+//func getHeader(url string, h map[string]string) (string, *http.Response, error) {
+// req, err := http.NewRequest("GET", url, bytes.NewBuffer(nil))
+// if err != nil {
+// return "", nil, err
+// }
+//
+// for k, v := range h {
+// req.Header.Set(k, v)
+// }
+//
+// r, err := http.DefaultClient.Do(req)
+// if err != nil {
+// return "", nil, err
+// }
+//
+// b, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// return "", nil, err
+// }
+//
+// err = r.Body.Close()
+// if err != nil {
+// return "", nil, err
+// }
+// return string(b), r, err
+//}
+//
+//func TestHandler_Echo(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// body, r, err := get("http://localhost:8177/?hello=world")
+// assert.NoError(t, err)
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", body)
+//}
+//
+//func Test_HandlerErrors(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// wr := httptest.NewRecorder()
+// rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("data")))
+//
+// h.ServeHTTP(wr, rq)
+// assert.Equal(t, 500, wr.Code)
+//}
+//
+//func Test_Handler_JSON_error(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// wr := httptest.NewRecorder()
+// rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("{sd")))
+// rq.Header.Add("Content-Type", "application/json")
+// rq.Header.Add("Content-Size", "3")
+//
+// h.ServeHTTP(wr, rq)
+// assert.Equal(t, 500, wr.Code)
+//}
+//
+//func TestHandler_Headers(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php header pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8078", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 100)
+//
+// req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil)
+// assert.NoError(t, err)
+//
+// req.Header.Add("input", "sample")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "world", r.Header.Get("Header"))
+// assert.Equal(t, "SAMPLE", string(b))
+//}
+//
+//func TestHandler_Empty_User_Agent(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php user-agent pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8088", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil)
+// assert.NoError(t, err)
+//
+// req.Header.Add("user-agent", "")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "", string(b))
+//}
+//
+//func TestHandler_User_Agent(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php user-agent pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8088", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil)
+// assert.NoError(t, err)
+//
+// req.Header.Add("User-Agent", "go-agent")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "go-agent", string(b))
+//}
+//
+//func TestHandler_Cookies(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php cookie pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8079", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// req, err := http.NewRequest("GET", "http://localhost:8079", nil)
+// assert.NoError(t, err)
+//
+// req.AddCookie(&http.Cookie{Name: "input", Value: "input-value"})
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "INPUT-VALUE", string(b))
+//
+// for _, c := range r.Cookies() {
+// assert.Equal(t, "output", c.Name)
+// assert.Equal(t, "cookie-output", c.Value)
+// }
+//}
+//
+//func TestHandler_JsonPayload_POST(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php payload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8090", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// req, err := http.NewRequest(
+// "POST",
+// "http://localhost"+hs.Addr,
+// bytes.NewBufferString(`{"key":"value"}`),
+// )
+// assert.NoError(t, err)
+//
+// req.Header.Add("Content-Type", "application/json")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, `{"value":"key"}`, string(b))
+//}
+//
+//func TestHandler_JsonPayload_PUT(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php payload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8081", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`))
+// assert.NoError(t, err)
+//
+// req.Header.Add("Content-Type", "application/json")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, `{"value":"key"}`, string(b))
+//}
+//
+//func TestHandler_JsonPayload_PATCH(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php payload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8082", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`))
+// assert.NoError(t, err)
+//
+// req.Header.Add("Content-Type", "application/json")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, `{"value":"key"}`, string(b))
+//}
+//
+//func TestHandler_FormData_POST(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php data pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8083", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// form := url.Values{}
+//
+// form.Add("key", "value")
+// form.Add("name[]", "name1")
+// form.Add("name[]", "name2")
+// form.Add("name[]", "name3")
+// form.Add("arr[x][y][z]", "y")
+// form.Add("arr[x][y][e]", "f")
+// form.Add("arr[c]p", "l")
+// form.Add("arr[c]z", "")
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+// assert.NoError(t, err)
+//
+// req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+//}
+//
+//func TestHandler_FormData_POST_Overwrite(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php data pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8083", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// form := url.Values{}
+//
+// form.Add("key", "value1")
+// form.Add("key", "value2")
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+// assert.NoError(t, err)
+//
+// req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// assert.Equal(t, `{"key":"value2","arr":{"x":{"y":null}}}`, string(b))
+//}
+//
+//func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php data pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8083", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// form := url.Values{}
+//
+// form.Add("key", "value")
+// form.Add("name[]", "name1")
+// form.Add("name[]", "name2")
+// form.Add("name[]", "name3")
+// form.Add("arr[x][y][z]", "y")
+// form.Add("arr[x][y][e]", "f")
+// form.Add("arr[c]p", "l")
+// form.Add("arr[c]z", "")
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+// assert.NoError(t, err)
+//
+// req.Header.Add("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+//}
+//
+//func TestHandler_FormData_PUT(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php data pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8084", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// form := url.Values{}
+//
+// form.Add("key", "value")
+// form.Add("name[]", "name1")
+// form.Add("name[]", "name2")
+// form.Add("name[]", "name3")
+// form.Add("arr[x][y][z]", "y")
+// form.Add("arr[x][y][e]", "f")
+// form.Add("arr[c]p", "l")
+// form.Add("arr[c]z", "")
+//
+// req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+// assert.NoError(t, err)
+//
+// req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+//}
+//
+//func TestHandler_FormData_PATCH(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php data pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8085", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// form := url.Values{}
+//
+// form.Add("key", "value")
+// form.Add("name[]", "name1")
+// form.Add("name[]", "name2")
+// form.Add("name[]", "name3")
+// form.Add("arr[x][y][z]", "y")
+// form.Add("arr[x][y][e]", "f")
+// form.Add("arr[c]p", "l")
+// form.Add("arr[c]z", "")
+//
+// req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+// assert.NoError(t, err)
+//
+// req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+//}
+//
+//func TestHandler_Multipart_POST(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php data pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8019", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+// err := w.WriteField("key", "value")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("key", "value")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("name[]", "name1")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("name[]", "name2")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("name[]", "name3")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[x][y][z]", "y")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[x][y][e]", "f")
+//
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[c]p", "l")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[c]z", "")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the writer: error %v", err)
+// }
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+//}
+//
+//func TestHandler_Multipart_PUT(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php data pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8020", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+// err := w.WriteField("key", "value")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("key", "value")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("name[]", "name1")
+//
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("name[]", "name2")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("name[]", "name3")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[x][y][z]", "y")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[x][y][e]", "f")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[c]p", "l")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[c]z", "")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the writer: error %v", err)
+// }
+//
+// req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+//}
+//
+//func TestHandler_Multipart_PATCH(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php data pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8021", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+//
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+// err := w.WriteField("key", "value")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("key", "value")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("name[]", "name1")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("name[]", "name2")
+//
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("name[]", "name3")
+//
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[x][y][z]", "y")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[x][y][e]", "f")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[c]p", "l")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.WriteField("arr[c]z", "")
+// if err != nil {
+// t.Errorf("error writing the field: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the writer: error %v", err)
+// }
+//
+// req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+//}
+//
+//func TestHandler_Error(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php error pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// _, r, err := get("http://localhost:8177/?hello=world")
+// assert.NoError(t, err)
+// assert.Equal(t, 500, r.StatusCode)
+//}
+//
+//func TestHandler_Error2(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php error2 pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// _, r, err := get("http://localhost:8177/?hello=world")
+// assert.NoError(t, err)
+// assert.Equal(t, 500, r.StatusCode)
+//}
+//
+//func TestHandler_Error3(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php pid pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// b2 := &bytes.Buffer{}
+// for i := 0; i < 1024*1024; i++ {
+// b2.Write([]byte(" "))
+// }
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, b2)
+// assert.NoError(t, err)
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error during the closing Body: error %v", err)
+//
+// }
+// }()
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 500, r.StatusCode)
+//}
+//
+//func TestHandler_ResponseDuration(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// gotresp := make(chan interface{})
+// h.Listen(func(event int, ctx interface{}) {
+// if event == EventResponse {
+// c := ctx.(*ResponseEvent)
+//
+// if c.Elapsed() > 0 {
+// close(gotresp)
+// }
+// }
+// })
+//
+// body, r, err := get("http://localhost:8177/?hello=world")
+// assert.NoError(t, err)
+//
+// <-gotresp
+//
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", body)
+//}
+//
+//func TestHandler_ResponseDurationDelayed(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php echoDelay pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// gotresp := make(chan interface{})
+// h.Listen(func(event int, ctx interface{}) {
+// if event == EventResponse {
+// c := ctx.(*ResponseEvent)
+//
+// if c.Elapsed() > time.Second {
+// close(gotresp)
+// }
+// }
+// })
+//
+// body, r, err := get("http://localhost:8177/?hello=world")
+// assert.NoError(t, err)
+//
+// <-gotresp
+//
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", body)
+//}
+//
+//func TestHandler_ErrorDuration(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php error pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// goterr := make(chan interface{})
+// h.Listen(func(event int, ctx interface{}) {
+// if event == EventError {
+// c := ctx.(*ErrorEvent)
+//
+// if c.Elapsed() > 0 {
+// close(goterr)
+// }
+// }
+// })
+//
+// _, r, err := get("http://localhost:8177/?hello=world")
+// assert.NoError(t, err)
+//
+// <-goterr
+//
+// assert.Equal(t, 500, r.StatusCode)
+//}
+//
+//func TestHandler_IP(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// TrustedSubnets: []string{
+// "10.0.0.0/8",
+// "127.0.0.0/8",
+// "172.16.0.0/12",
+// "192.168.0.0/16",
+// "::1/128",
+// "fc00::/7",
+// "fe80::/10",
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php ip pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// err := h.cfg.parseCIDRs()
+// if err != nil {
+// t.Errorf("error parsing CIDRs: error %v", err)
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// body, r, err := get("http://127.0.0.1:8177/")
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "127.0.0.1", body)
+//}
+//
+//func TestHandler_XRealIP(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// TrustedSubnets: []string{
+// "10.0.0.0/8",
+// "127.0.0.0/8",
+// "172.16.0.0/12",
+// "192.168.0.0/16",
+// "::1/128",
+// "fc00::/7",
+// "fe80::/10",
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php ip pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// err := h.cfg.parseCIDRs()
+// if err != nil {
+// t.Errorf("error parsing CIDRs: error %v", err)
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{
+// "X-Real-Ip": "200.0.0.1",
+// })
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "200.0.0.1", body)
+//}
+//
+//func TestHandler_XForwardedFor(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// TrustedSubnets: []string{
+// "10.0.0.0/8",
+// "127.0.0.0/8",
+// "172.16.0.0/12",
+// "192.168.0.0/16",
+// "100.0.0.0/16",
+// "200.0.0.0/16",
+// "::1/128",
+// "fc00::/7",
+// "fe80::/10",
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php ip pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// err := h.cfg.parseCIDRs()
+// if err != nil {
+// t.Errorf("error parsing CIDRs: error %v", err)
+// }
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{
+// "X-Forwarded-For": "100.0.0.1, 200.0.0.1, invalid, 101.0.0.1",
+// })
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "101.0.0.1", body)
+//
+// body, r, err = getHeader("http://127.0.0.1:8177/", map[string]string{
+// "X-Forwarded-For": "100.0.0.1, 200.0.0.1, 101.0.0.1, invalid",
+// })
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "101.0.0.1", body)
+//}
+//
+//func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// TrustedSubnets: []string{
+// "10.0.0.0/8",
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php ip pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// err := h.cfg.parseCIDRs()
+// if err != nil {
+// t.Errorf("error parsing CIDRs: error %v", err)
+// }
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{
+// "X-Forwarded-For": "100.0.0.1, 200.0.0.1, invalid, 101.0.0.1",
+// })
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "127.0.0.1", body)
+//}
+//
+//func BenchmarkHandler_Listen_Echo(b *testing.B) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php echo pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: int64(runtime.NumCPU()),
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// err := h.pool.Start()
+// if err != nil {
+// b.Errorf("error starting the worker pool: error %v", err)
+// }
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8177", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// b.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// b.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// bb := "WORLD"
+// for n := 0; n < b.N; n++ {
+// r, err := http.Get("http://localhost:8177/?hello=world")
+// if err != nil {
+// b.Fail()
+// }
+// // Response might be nil here
+// if r != nil {
+// br, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// b.Errorf("error reading Body: error %v", err)
+// }
+// if string(br) != bb {
+// b.Fail()
+// }
+// err = r.Body.Close()
+// if err != nil {
+// b.Errorf("error closing the Body: error %v", err)
+// }
+// } else {
+// b.Errorf("got nil response")
+// }
+// }
+//}
diff --git a/plugins/http/parse.go b/plugins/http/parse.go
new file mode 100644
index 00000000..9b58d328
--- /dev/null
+++ b/plugins/http/parse.go
@@ -0,0 +1,147 @@
+package http
+
+import (
+ "net/http"
+)
+
+// MaxLevel defines maximum tree depth for incoming request data and files.
+const MaxLevel = 127
+
+type dataTree map[string]interface{}
+type fileTree map[string]interface{}
+
+// parseData parses incoming request body into data tree.
+func parseData(r *http.Request) dataTree {
+ data := make(dataTree)
+ if r.PostForm != nil {
+ for k, v := range r.PostForm {
+ data.push(k, v)
+ }
+ }
+
+ if r.MultipartForm != nil {
+ for k, v := range r.MultipartForm.Value {
+ data.push(k, v)
+ }
+ }
+
+ return data
+}
+
+// pushes value into data tree.
+func (d dataTree) push(k string, v []string) {
+ keys := fetchIndexes(k)
+ if len(keys) <= MaxLevel {
+ d.mount(keys, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d dataTree) mount(i []string, v []string) {
+ if len(i) == 1 {
+ // single value context (last element)
+ d[i[0]] = v[len(v)-1]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(dataTree).mount(i[1:], v)
+ return
+ }
+
+ d[i[0]] = make(dataTree)
+ d[i[0]].(dataTree).mount(i[1:], v)
+}
+
+// parse incoming dataTree request into JSON (including contentMultipart form dataTree)
+func parseUploads(r *http.Request, cfg *UploadsConfig) *Uploads {
+ u := &Uploads{
+ cfg: cfg,
+ tree: make(fileTree),
+ list: make([]*FileUpload, 0),
+ }
+
+ for k, v := range r.MultipartForm.File {
+ files := make([]*FileUpload, 0, len(v))
+ for _, f := range v {
+ files = append(files, NewUpload(f))
+ }
+
+ u.list = append(u.list, files...)
+ u.tree.push(k, files)
+ }
+
+ return u
+}
+
+// pushes new file upload into it's proper place.
+func (d fileTree) push(k string, v []*FileUpload) {
+ keys := fetchIndexes(k)
+ if len(keys) <= MaxLevel {
+ d.mount(keys, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d fileTree) mount(i []string, v []*FileUpload) {
+ if len(i) == 1 {
+ // single value context
+ d[i[0]] = v[0]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(fileTree).mount(i[1:], v)
+ return
+ }
+
+ d[i[0]] = make(fileTree)
+ d[i[0]].(fileTree).mount(i[1:], v)
+}
+
+// fetchIndexes parses input name and splits it into separate indexes list.
+func fetchIndexes(s string) []string {
+ var (
+ pos int
+ ch string
+ keys = make([]string, 1)
+ )
+
+ for _, c := range s {
+ ch = string(c)
+ switch ch {
+ case " ":
+ // ignore all spaces
+ continue
+ case "[":
+ pos = 1
+ continue
+ case "]":
+ if pos == 1 {
+ keys = append(keys, "")
+ }
+ pos = 2
+ default:
+ if pos == 1 || pos == 2 {
+ keys = append(keys, "")
+ }
+
+ keys[len(keys)-1] += ch
+ pos = 0
+ }
+ }
+
+ return keys
+}
diff --git a/plugins/http/parse_test.go b/plugins/http/parse_test.go
new file mode 100644
index 00000000..f95a3f9d
--- /dev/null
+++ b/plugins/http/parse_test.go
@@ -0,0 +1,52 @@
+package http
+
+import "testing"
+
+var samples = []struct {
+ in string
+ out []string
+}{
+ {"key", []string{"key"}},
+ {"key[subkey]", []string{"key", "subkey"}},
+ {"key[subkey]value", []string{"key", "subkey", "value"}},
+ {"key[subkey][value]", []string{"key", "subkey", "value"}},
+ {"key[subkey][value][]", []string{"key", "subkey", "value", ""}},
+ {"key[subkey] [value][]", []string{"key", "subkey", "value", ""}},
+ {"key [ subkey ] [ value ] [ ]", []string{"key", "subkey", "value", ""}},
+}
+
+func Test_FetchIndexes(t *testing.T) {
+ for _, tt := range samples {
+ t.Run(tt.in, func(t *testing.T) {
+ r := fetchIndexes(tt.in)
+ if !same(r, tt.out) {
+ t.Errorf("got %q, want %q", r, tt.out)
+ }
+ })
+ }
+}
+
+func BenchmarkConfig_FetchIndexes(b *testing.B) {
+ for _, tt := range samples {
+ for n := 0; n < b.N; n++ {
+ r := fetchIndexes(tt.in)
+ if !same(r, tt.out) {
+ b.Fail()
+ }
+ }
+ }
+}
+
+func same(in, out []string) bool {
+ if len(in) != len(out) {
+ return false
+ }
+
+ for i, v := range in {
+ if v != out[i] {
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
new file mode 100644
index 00000000..581455b3
--- /dev/null
+++ b/plugins/http/plugin.go
@@ -0,0 +1,470 @@
+package http
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/http/fcgi"
+ "net/url"
+ "strings"
+ "sync"
+
+ "github.com/hashicorp/go-multierror"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+ factory "github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/util"
+ "golang.org/x/net/http2"
+ "golang.org/x/net/http2/h2c"
+ "golang.org/x/sys/cpu"
+)
+
+const (
+ // ID contains default service name.
+ ServiceName = "http"
+
+ // EventInitSSL thrown at moment of https initialization. SSL server passed as context.
+ EventInitSSL = 750
+)
+
+//var couldNotAppendPemError = errors.New("could not append Certs from PEM")
+
+// http middleware type.
+type middleware func(f http.HandlerFunc) http.HandlerFunc
+
+// Service manages pool, http servers.
+type Plugin struct {
+ sync.Mutex
+ sync.WaitGroup
+
+ cfg *Config
+ configurer config.Configurer
+ log log.Logger
+
+ mdwr []middleware
+ listeners []util.EventListener
+
+ pool roadrunner.Pool
+ server factory.Server
+ //controller roadrunner.Controller
+ handler *Handler
+
+ http *http.Server
+ https *http.Server
+ fcgi *http.Server
+}
+
+// AddMiddleware adds new net/http mdwr.
+func (s *Plugin) AddMiddleware(m middleware) {
+ s.mdwr = append(s.mdwr, m)
+}
+
+// AddListener attaches server event controller.
+func (s *Plugin) AddListener(listener util.EventListener) {
+ // save listeners for Reset
+ s.listeners = append(s.listeners, listener)
+ s.pool.AddListener(listener)
+}
+
+// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
+// misconfiguration. Services must not be used without proper configuration pushed first.
+func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Server) error {
+ const op = errors.Op("http Init")
+ err := cfg.UnmarshalKey(ServiceName, &s.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ s.configurer = cfg
+ s.listeners = make([]util.EventListener, 0, 1)
+ s.log = log
+
+ // Set needed env vars
+ env := make(map[string]string)
+ env["RR_HTTP"] = "true"
+
+ p, err := server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ Debug: s.cfg.Pool.Debug,
+ NumWorkers: s.cfg.Pool.NumWorkers,
+ MaxJobs: s.cfg.Pool.MaxJobs,
+ AllocateTimeout: s.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: s.cfg.Pool.DestroyTimeout,
+ Supervisor: nil,
+ }, env)
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ s.pool = p
+
+ //if r != nil {
+ // if err := r.Register(ID, &rpcServer{s}); err != nil {
+ // return false, err
+ // }
+ //}
+ //
+ //if !cfg.EnableHTTP() && !cfg.EnableTLS() && !cfg.EnableFCGI() {
+ // return false, nil
+ //}
+
+ return nil
+}
+
+// Serve serves the svc.
+func (s *Plugin) Serve() chan error {
+ s.Lock()
+ defer s.Unlock()
+
+ const op = errors.Op("serve http")
+ errCh := make(chan error, 2)
+
+ //if s.env != nil {
+ // if err := s.env.Copy(s.cfg.Workers); err != nil {
+ // return nil
+ // }
+ //}
+ //
+ //s.cfg.Workers.CommandProducer = s.cprod
+ //s.cfg.Workers.SetEnv("RR_HTTP", "true")
+ //
+ //s.pool = roadrunner.NewServer(s.cfg.Workers)
+ //s.pool.Listen(s.throw)
+ //
+ //if s.controller != nil {
+ // s.pool.Attach(s.controller)
+ //}
+
+ s.handler = &Handler{cfg: s.cfg, rr: s.pool}
+ //s.handler.Listen(s.throw)
+
+ if s.cfg.EnableHTTP() {
+ if s.cfg.EnableH2C() {
+ s.http = &http.Server{Addr: s.cfg.Address, Handler: h2c.NewHandler(s, &http2.Server{})}
+ } else {
+ s.http = &http.Server{Addr: s.cfg.Address, Handler: s}
+ }
+ }
+
+ if s.cfg.EnableTLS() {
+ s.https = s.initSSL()
+ if s.cfg.SSL.RootCA != "" {
+ err := s.appendRootCa()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+ }
+
+ if s.cfg.EnableHTTP2() {
+ if err := s.initHTTP2(); err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+ }
+ }
+
+ if s.cfg.EnableFCGI() {
+ s.fcgi = &http.Server{Handler: s}
+ }
+
+ //if err := s.pool.Start(); err != nil {
+ // return err
+ //}
+ //defer s.pool.Stop()
+
+ if s.http != nil {
+ go func() {
+ httpErr := s.http.ListenAndServe()
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ errCh <- errors.E(op, httpErr)
+ return
+ }
+ return
+ }()
+ }
+
+ if s.https != nil {
+ go func() {
+ httpErr := s.https.ListenAndServeTLS(
+ s.cfg.SSL.Cert,
+ s.cfg.SSL.Key,
+ )
+
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ errCh <- errors.E(op, httpErr)
+ return
+ }
+ return
+ }()
+ }
+
+ if s.fcgi != nil {
+ go func() {
+ httpErr := s.serveFCGI()
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ errCh <- errors.E(op, httpErr)
+ return
+ }
+ return
+ }()
+ }
+
+ return errCh
+}
+
+// Stop stops the http.
+func (s *Plugin) Stop() error {
+ s.Lock()
+ defer s.Unlock()
+
+ var err error
+ if s.fcgi != nil {
+ err = s.fcgi.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error("error shutting down the fcgi server", "error", err)
+ // write error and try to stop other transport
+ err = multierror.Append(err)
+ }
+ }
+
+ if s.https != nil {
+ err = s.https.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error("error shutting down the https server", "error", err)
+ // write error and try to stop other transport
+ err = multierror.Append(err)
+ }
+ }
+
+ if s.http != nil {
+ err = s.http.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error("error shutting down the http server", "error", err)
+ // write error and try to stop other transport
+ err = multierror.Append(err)
+ }
+ }
+
+ return err
+}
+
+// ServeHTTP handles connection using set of middleware and pool PSR-7 server.
+func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect {
+ target := &url.URL{
+ Scheme: "https",
+ Host: s.tlsAddr(r.Host, false),
+ Path: r.URL.Path,
+ RawQuery: r.URL.RawQuery,
+ }
+
+ http.Redirect(w, r, target.String(), http.StatusTemporaryRedirect)
+ return
+ }
+
+ if s.https != nil && r.TLS != nil {
+ w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload")
+ }
+
+ //r = attributes.Init(r)
+
+ // chaining middleware
+ f := s.handler.ServeHTTP
+ for _, m := range s.mdwr {
+ f = m(f)
+ }
+ f(w, r)
+}
+
+// append RootCA to the https server TLS config
+func (s *Plugin) appendRootCa() error {
+ const op = errors.Op("append root CA")
+ rootCAs, err := x509.SystemCertPool()
+ if err != nil {
+ //s.throw(EventInitSSL, nil)
+ return nil
+ }
+ if rootCAs == nil {
+ rootCAs = x509.NewCertPool()
+ }
+
+ CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA)
+ if err != nil {
+ //s.throw(EventInitSSL, nil)
+ return err
+ }
+
+ // should append our CA cert
+ ok := rootCAs.AppendCertsFromPEM(CA)
+ if !ok {
+ return errors.E(op, errors.Str("could not append Certs from PEM"))
+ }
+ cfg := &tls.Config{
+ InsecureSkipVerify: false,
+ RootCAs: rootCAs,
+ }
+ s.http.TLSConfig = cfg
+
+ return nil
+}
+
+// Init https server
+func (s *Plugin) initSSL() *http.Server {
+ var topCipherSuites []uint16
+ var defaultCipherSuitesTLS13 []uint16
+
+ hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ
+ hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL
+ // Keep in sync with crypto/aes/cipher_s390x.go.
+ hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM)
+
+ hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X
+
+ if hasGCMAsm {
+ // If AES-GCM hardware is provided then prioritise AES-GCM
+ // cipher suites.
+ topCipherSuites = []uint16{
+ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
+ }
+ defaultCipherSuitesTLS13 = []uint16{
+ tls.TLS_AES_128_GCM_SHA256,
+ tls.TLS_CHACHA20_POLY1305_SHA256,
+ tls.TLS_AES_256_GCM_SHA384,
+ }
+ } else {
+ // Without AES-GCM hardware, we put the ChaCha20-Poly1305
+ // cipher suites first.
+ topCipherSuites = []uint16{
+ tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ }
+ defaultCipherSuitesTLS13 = []uint16{
+ tls.TLS_CHACHA20_POLY1305_SHA256,
+ tls.TLS_AES_128_GCM_SHA256,
+ tls.TLS_AES_256_GCM_SHA384,
+ }
+ }
+
+ DefaultCipherSuites := make([]uint16, 0, 22)
+ DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...)
+ DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
+
+ server := &http.Server{
+ Addr: s.tlsAddr(s.cfg.Address, true),
+ Handler: s,
+ TLSConfig: &tls.Config{
+ CurvePreferences: []tls.CurveID{
+ tls.CurveP256,
+ tls.CurveP384,
+ tls.CurveP521,
+ tls.X25519,
+ },
+ CipherSuites: DefaultCipherSuites,
+ MinVersion: tls.VersionTLS12,
+ PreferServerCipherSuites: true,
+ },
+ }
+
+ return server
+}
+
+// init http/2 server
+func (s *Plugin) initHTTP2() error {
+ return http2.ConfigureServer(s.https, &http2.Server{
+ MaxConcurrentStreams: s.cfg.HTTP2.MaxConcurrentStreams,
+ })
+}
+
+// serveFCGI starts FastCGI server.
+func (s *Plugin) serveFCGI() error {
+ l, err := util.CreateListener(s.cfg.FCGI.Address)
+ if err != nil {
+ return err
+ }
+
+ err = fcgi.Serve(l, s.fcgi.Handler)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// throw handles service, server and pool events.
+//func (s *Plugin) throw(event int, ctx interface{}) {
+// for _, l := range s.lsns {
+// l(event, ctx)
+// }
+//
+// if event == roadrunner.EventServerFailure {
+// // underlying pool server is dead
+// s.Stop()
+// }
+//}
+
+// tlsAddr replaces listen or host port with port configured by SSL config.
+func (s *Plugin) tlsAddr(host string, forcePort bool) string {
+ // remove current forcePort first
+ host = strings.Split(host, ":")[0]
+
+ if forcePort || s.cfg.SSL.Port != 443 {
+ host = fmt.Sprintf("%s:%v", host, s.cfg.SSL.Port)
+ }
+
+ return host
+}
+
+// Server returns associated pool workers
+func (s *Plugin) Workers() []roadrunner.WorkerBase {
+ return s.pool.Workers()
+}
+
+func (s *Plugin) Reset() error {
+ s.Lock()
+ defer s.Unlock()
+ s.pool.Destroy(context.Background())
+
+ // Set needed env vars
+ env := make(map[string]string)
+ env["RR_HTTP"] = "true"
+ var err error
+
+ // re-read the config
+ err = s.configurer.UnmarshalKey(ServiceName, &s.cfg)
+ if err != nil {
+ return err
+ }
+
+ s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ Debug: false,
+ NumWorkers: 0,
+ MaxJobs: 0,
+ AllocateTimeout: 0,
+ DestroyTimeout: 0,
+ Supervisor: nil,
+ }, env)
+ if err != nil {
+ return err
+ }
+
+ // restore original listeners
+ for i := 0; i < len(s.listeners); i++ {
+ s.pool.AddListener(s.listeners[i])
+ }
+ return nil
+}
diff --git a/plugins/http/plugin_test.go b/plugins/http/plugin_test.go
new file mode 100644
index 00000000..012cea04
--- /dev/null
+++ b/plugins/http/plugin_test.go
@@ -0,0 +1,759 @@
+package http
+
+//import (
+// "github.com/cenkalti/backoff/v4"
+// json "github.com/json-iterator/go"
+// "github.com/sirupsen/logrus"
+// "github.com/sirupsen/logrus/hooks/test"
+// "github.com/spiral/roadrunner"
+// "github.com/spiral/roadrunner/service"
+// "github.com/spiral/roadrunner/service/env"
+// "github.com/spiral/roadrunner/service/rpc"
+// "github.com/stretchr/testify/assert"
+// "io/ioutil"
+// "net/http"
+// "os"
+// "testing"
+// "time"
+//)
+//
+//type testCfg struct {
+// httpCfg string
+// rpcCfg string
+// envCfg string
+// target string
+//}
+//
+//func (cfg *testCfg) Get(name string) service.Config {
+// if name == ID {
+// if cfg.httpCfg == "" {
+// return nil
+// }
+//
+// return &testCfg{target: cfg.httpCfg}
+// }
+//
+// if name == rpc.ID {
+// return &testCfg{target: cfg.rpcCfg}
+// }
+//
+// if name == env.ID {
+// return &testCfg{target: cfg.envCfg}
+// }
+//
+// return nil
+//}
+//func (cfg *testCfg) Unmarshal(out interface{}) error {
+// j := json.ConfigCompatibleWithStandardLibrary
+// return j.Unmarshal([]byte(cfg.target), out)
+//}
+//
+//func Test_Service_NoConfig(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{"Enable":true}`})
+// assert.Error(t, err)
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusInactive, st)
+//}
+//
+//func Test_Service_Configure_Disable(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusInactive, st)
+//}
+//
+//func Test_Service_Configure_Enable(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":8070",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//
+//}
+//
+//func Test_Service_Echo(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6536",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("serve error: %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 100)
+//
+// req, err := http.NewRequest("GET", "http://localhost:6536?hello=world", nil)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// r, err := http.DefaultClient.Do(req)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+// b, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+// err = r.Body.Close()
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// c.Stop()
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func Test_Service_Env(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(env.ID, env.NewService(map[string]string{"pool": "test"}))
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":10031",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php env pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`, envCfg: `{"env_key":"ENV_VALUE"}`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("serve error: %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "http://localhost:10031", nil)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// r, err := http.DefaultClient.Do(req)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// b, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "ENV_VALUE", string(b))
+//
+// err = r.Body.Close()
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// c.Stop()
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//
+//}
+//
+//func Test_Service_ErrorEcho(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6030",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echoerr pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// goterr := make(chan interface{})
+// s.(*Service).AddListener(func(event int, ctx interface{}) {
+// if event == roadrunner.EventStderrOutput {
+// if string(ctx.([]byte)) == "WORLD\n" {
+// goterr <- nil
+// }
+// }
+// })
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("serve error: %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// r, err := http.DefaultClient.Do(req)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// b, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// <-goterr
+//
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+// err = r.Body.Close()
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// c.Stop()
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func Test_Service_Middleware(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6032",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc {
+// return func(w http.ResponseWriter, r *http.Request) {
+// if r.URL.Path == "/halt" {
+// w.WriteHeader(500)
+// _, err := w.Write([]byte("halted"))
+// if err != nil {
+// t.Errorf("error writing the data to the http reply: error %v", err)
+// }
+// } else {
+// f(w, r)
+// }
+// }
+// })
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("serve error: %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// r, err := http.DefaultClient.Do(req)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// b, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+// err = r.Body.Close()
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// r, err = http.DefaultClient.Do(req)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+// b, err = ioutil.ReadAll(r.Body)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// assert.Equal(t, 500, r.StatusCode)
+// assert.Equal(t, "halted", string(b))
+//
+// err = r.Body.Close()
+// if err != nil {
+// c.Stop()
+// return err
+// }
+// c.Stop()
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//
+//}
+//
+//func Test_Service_Listener(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6033",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// stop := make(chan interface{})
+// s.(*Service).AddListener(func(event int, ctx interface{}) {
+// if event == roadrunner.EventServerStart {
+// stop <- nil
+// }
+// })
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("serve error: %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// c.Stop()
+// assert.True(t, true)
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func Test_Service_Error(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6034",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "---",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// // assert error
+// err = c.Serve()
+// if err == nil {
+// return err
+// }
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func Test_Service_Error2(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6035",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php broken pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// // assert error
+// err = c.Serve()
+// if err == nil {
+// return err
+// }
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func Test_Service_Error3(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6036",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers"
+// "command": "php ../../tests/http/client.php broken pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// // assert error
+// if err == nil {
+// return err
+// }
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//
+//}
+//
+//func Test_Service_Error4(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": "----",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php broken pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// // assert error
+// if err != nil {
+// return nil
+// }
+//
+// return err
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func tmpDir() string {
+// p := os.TempDir()
+// j := json.ConfigCompatibleWithStandardLibrary
+// r, _ := j.Marshal(p)
+//
+// return string(r)
+//}
diff --git a/plugins/http/request.go b/plugins/http/request.go
new file mode 100644
index 00000000..7e9839b2
--- /dev/null
+++ b/plugins/http/request.go
@@ -0,0 +1,183 @@
+package http
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+)
+
+const (
+ defaultMaxMemory = 32 << 20 // 32 MB
+ contentNone = iota + 900
+ contentStream
+ contentMultipart
+ contentFormData
+)
+
+// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
+type Request struct {
+ // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address.
+ RemoteAddr string `json:"remoteAddr"`
+
+ // Protocol includes HTTP protocol version.
+ Protocol string `json:"protocol"`
+
+ // Method contains name of HTTP method used for the request.
+ Method string `json:"method"`
+
+ // URI contains full request URI with scheme and query.
+ URI string `json:"uri"`
+
+ // Header contains list of request headers.
+ Header http.Header `json:"headers"`
+
+ // Cookies contains list of request cookies.
+ Cookies map[string]string `json:"cookies"`
+
+ // RawQuery contains non parsed query string (to be parsed on php end).
+ RawQuery string `json:"rawQuery"`
+
+ // Parsed indicates that request body has been parsed on RR end.
+ Parsed bool `json:"parsed"`
+
+ // Uploads contains list of uploaded files, their names, sized and associations with temporary files.
+ Uploads *Uploads `json:"uploads"`
+
+ // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions.
+ Attributes map[string]interface{} `json:"attributes"`
+
+ // request body can be parsedData or []byte
+ body interface{}
+}
+
+func fetchIP(pair string) string {
+ if !strings.ContainsRune(pair, ':') {
+ return pair
+ }
+
+ addr, _, _ := net.SplitHostPort(pair)
+ return addr
+}
+
+// NewRequest creates new PSR7 compatible request using net/http request.
+func NewRequest(r *http.Request, cfg *UploadsConfig) (*Request, error) {
+ req := &Request{
+ RemoteAddr: fetchIP(r.RemoteAddr),
+ Protocol: r.Proto,
+ Method: r.Method,
+ URI: uri(r),
+ Header: r.Header,
+ Cookies: make(map[string]string),
+ RawQuery: r.URL.RawQuery,
+ //Attributes: attributes.All(r),
+ }
+
+ for _, c := range r.Cookies() {
+ if v, err := url.QueryUnescape(c.Value); err == nil {
+ req.Cookies[c.Name] = v
+ }
+ }
+
+ switch req.contentType() {
+ case contentNone:
+ return req, nil
+
+ case contentStream:
+ var err error
+ req.body, err = ioutil.ReadAll(r.Body)
+ return req, err
+
+ case contentMultipart:
+ if err := r.ParseMultipartForm(defaultMaxMemory); err != nil {
+ return nil, err
+ }
+
+ req.Uploads = parseUploads(r, cfg)
+ fallthrough
+ case contentFormData:
+ if err := r.ParseForm(); err != nil {
+ return nil, err
+ }
+
+ req.body = parseData(r)
+ }
+
+ req.Parsed = true
+ return req, nil
+}
+
+// Open moves all uploaded files to temporary directory so it can be given to php later.
+func (r *Request) Open(log log.Logger) {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Open(log)
+}
+
+// Close clears all temp file uploads
+func (r *Request) Close(log log.Logger) {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Clear(log)
+}
+
+// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
+// files prior to calling this method.
+func (r *Request) Payload() (roadrunner.Payload, error) {
+ p := roadrunner.Payload{}
+
+ var err error
+ if p.Context, err = json.Marshal(r); err != nil {
+ return roadrunner.EmptyPayload, err
+ }
+
+ if r.Parsed {
+ if p.Body, err = json.Marshal(r.body); err != nil {
+ return roadrunner.EmptyPayload, err
+ }
+ } else if r.body != nil {
+ p.Body = r.body.([]byte)
+ }
+
+ return p, nil
+}
+
+// contentType returns the payload content type.
+func (r *Request) contentType() int {
+ if r.Method == "HEAD" || r.Method == "OPTIONS" {
+ return contentNone
+ }
+
+ ct := r.Header.Get("content-type")
+ if strings.Contains(ct, "application/x-www-form-urlencoded") {
+ return contentFormData
+ }
+
+ if strings.Contains(ct, "multipart/form-data") {
+ return contentMultipart
+ }
+
+ return contentStream
+}
+
+// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
+func uri(r *http.Request) string {
+ if r.URL.Host != "" {
+ return r.URL.String()
+ }
+ if r.TLS != nil {
+ return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
+ }
+
+ return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
+}
diff --git a/plugins/http/response.go b/plugins/http/response.go
new file mode 100644
index 00000000..9ebc0632
--- /dev/null
+++ b/plugins/http/response.go
@@ -0,0 +1,105 @@
+package http
+
+import (
+ "io"
+ "net/http"
+ "strings"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2"
+)
+
+// Response handles PSR7 response logic.
+type Response struct {
+ // Status contains response status.
+ Status int `json:"status"`
+
+ // Header contains list of response headers.
+ Headers map[string][]string `json:"headers"`
+
+ // associated body payload.
+ body interface{}
+}
+
+// NewResponse creates new response based on given pool payload.
+func NewResponse(p roadrunner.Payload) (*Response, error) {
+ r := &Response{body: p.Body}
+ j := json.ConfigCompatibleWithStandardLibrary
+ if err := j.Unmarshal(p.Context, r); err != nil {
+ return nil, err
+ }
+
+ return r, nil
+}
+
+// Write writes response headers, status and body into ResponseWriter.
+func (r *Response) Write(w http.ResponseWriter) error {
+ // INFO map is the reference type in golang
+ p := handlePushHeaders(r.Headers)
+ if pusher, ok := w.(http.Pusher); ok {
+ for _, v := range p {
+ err := pusher.Push(v, nil)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ handleTrailers(r.Headers)
+ for n, h := range r.Headers {
+ for _, v := range h {
+ w.Header().Add(n, v)
+ }
+ }
+
+ w.WriteHeader(r.Status)
+
+ if data, ok := r.body.([]byte); ok {
+ _, err := w.Write(data)
+ if err != nil {
+ return handleWriteError(err)
+ }
+ }
+
+ if rc, ok := r.body.(io.Reader); ok {
+ if _, err := io.Copy(w, rc); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func handlePushHeaders(h map[string][]string) []string {
+ var p []string
+ pushHeader, ok := h[http2pushHeaderKey]
+ if !ok {
+ return p
+ }
+
+ p = append(p, pushHeader...)
+
+ delete(h, http2pushHeaderKey)
+
+ return p
+}
+
+func handleTrailers(h map[string][]string) {
+ trailers, ok := h[trailerHeaderKey]
+ if !ok {
+ return
+ }
+
+ for _, tr := range trailers {
+ for _, n := range strings.Split(tr, ",") {
+ n = strings.Trim(n, "\t ")
+ if v, ok := h[n]; ok {
+ h["Trailer:"+n] = v
+
+ delete(h, n)
+ }
+ }
+ }
+
+ delete(h, trailerHeaderKey)
+}
diff --git a/plugins/http/response_test.go b/plugins/http/response_test.go
new file mode 100644
index 00000000..b5adbad9
--- /dev/null
+++ b/plugins/http/response_test.go
@@ -0,0 +1,162 @@
+package http
+
+import (
+ "bytes"
+ "errors"
+ "net/http"
+ "testing"
+
+ "github.com/spiral/roadrunner/v2"
+ "github.com/stretchr/testify/assert"
+)
+
+type testWriter struct {
+ h http.Header
+ buf bytes.Buffer
+ wroteHeader bool
+ code int
+ err error
+ pushErr error
+ pushes []string
+}
+
+func (tw *testWriter) Header() http.Header { return tw.h }
+
+func (tw *testWriter) Write(p []byte) (int, error) {
+ if !tw.wroteHeader {
+ tw.WriteHeader(http.StatusOK)
+ }
+
+ n, e := tw.buf.Write(p)
+ if e == nil {
+ e = tw.err
+ }
+
+ return n, e
+}
+
+func (tw *testWriter) WriteHeader(code int) { tw.wroteHeader = true; tw.code = code }
+
+func (tw *testWriter) Push(target string, opts *http.PushOptions) error {
+ tw.pushes = append(tw.pushes, target)
+
+ return tw.pushErr
+}
+
+func TestNewResponse_Error(t *testing.T) {
+ r, err := NewResponse(roadrunner.Payload{Context: []byte(`invalid payload`)})
+ assert.Error(t, err)
+ assert.Nil(t, r)
+}
+
+func TestNewResponse_Write(t *testing.T) {
+ r, err := NewResponse(roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ Body: []byte(`sample body`),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Equal(t, 301, w.code)
+ assert.Equal(t, "value", w.h.Get("key"))
+ assert.Equal(t, "sample body", w.buf.String())
+}
+
+func TestNewResponse_Stream(t *testing.T) {
+ r, err := NewResponse(roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ })
+
+ // r is pointer, so, it might be nil
+ if r == nil {
+ t.Fatal("response is nil")
+ }
+
+ r.body = &bytes.Buffer{}
+ r.body.(*bytes.Buffer).WriteString("hello world")
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Equal(t, 301, w.code)
+ assert.Equal(t, "value", w.h.Get("key"))
+ assert.Equal(t, "hello world", w.buf.String())
+}
+
+func TestNewResponse_StreamError(t *testing.T) {
+ r, err := NewResponse(roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ })
+
+ // r is pointer, so, it might be nil
+ if r == nil {
+ t.Fatal("response is nil")
+ }
+
+ r.body = &bytes.Buffer{}
+ r.body.(*bytes.Buffer).WriteString("hello world")
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string)), err: errors.New("error")}
+ assert.Error(t, r.Write(w))
+}
+
+func TestWrite_HandlesPush(t *testing.T) {
+ r, err := NewResponse(roadrunner.Payload{
+ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Nil(t, w.h["Http2-Push"])
+ assert.Equal(t, []string{"/test.js"}, w.pushes)
+}
+
+func TestWrite_HandlesTrailers(t *testing.T) {
+ r, err := NewResponse(roadrunner.Payload{
+ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Nil(t, w.h[trailerHeaderKey])
+ assert.Nil(t, w.h["foo"]) //nolint:golint,staticcheck
+ assert.Nil(t, w.h["baz"]) //nolint:golint,staticcheck
+
+ assert.Equal(t, "test", w.h.Get("Trailer:foo"))
+ assert.Equal(t, "demo", w.h.Get("Trailer:bar"))
+}
+
+func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) {
+ r, err := NewResponse(roadrunner.Payload{
+ Context: []byte(
+ `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Equal(t, "a", w.h.Get("Trailer:foo"))
+ assert.Equal(t, "b", w.h.Get("Trailer:bar"))
+ assert.Equal(t, "c", w.h.Get("Trailer:baz"))
+}
diff --git a/plugins/http/rpc.go b/plugins/http/rpc.go
new file mode 100644
index 00000000..56d8f1a1
--- /dev/null
+++ b/plugins/http/rpc.go
@@ -0,0 +1,34 @@
+package http
+
+//import (
+// "github.com/pkg/errors"
+// "github.com/spiral/roadrunner/util"
+//)
+
+//type rpcServer struct{ svc *Plugin }
+//
+//// WorkerList contains list of workers.
+//type WorkerList struct {
+// // Workers is list of workers.
+// Workers []*util.State `json:"workers"`
+//}
+//
+//// Reset resets underlying RR worker pool and restarts all of it's workers.
+//func (rpc *rpcServer) Reset(reset bool, r *string) error {
+// if rpc.svc == nil || rpc.svc.handler == nil {
+// return errors.New("http server is not running")
+// }
+//
+// *r = "OK"
+// return rpc.svc.Server().Reset()
+//}
+//
+//// Workers returns list of active workers and their stats.
+//func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) {
+// if rpc.svc == nil || rpc.svc.handler == nil {
+// return errors.New("http server is not running")
+// }
+//
+// r.Workers, err = util.ServerState(rpc.svc.Server())
+// return err
+//}
diff --git a/plugins/http/rpc_test.go b/plugins/http/rpc_test.go
new file mode 100644
index 00000000..86499d46
--- /dev/null
+++ b/plugins/http/rpc_test.go
@@ -0,0 +1,222 @@
+package http
+
+//
+//import (
+// json "github.com/json-iterator/go"
+// "github.com/sirupsen/logrus"
+// "github.com/sirupsen/logrus/hooks/test"
+// "github.com/spiral/roadrunner/service"
+// "github.com/spiral/roadrunner/service/rpc"
+// "github.com/stretchr/testify/assert"
+// "os"
+// "strconv"
+// "testing"
+// "time"
+//)
+//
+//func Test_RPC(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(rpc.ID, &rpc.Service{})
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{
+// rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`,
+// httpCfg: `{
+// "enable": true,
+// "address": ":16031",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php pid pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`}))
+//
+// s, _ := c.Get(ID)
+// ss := s.(*Service)
+//
+// s2, _ := c.Get(rpc.ID)
+// rs := s2.(*rpc.Service)
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Second)
+//
+// res, _, err := get("http://localhost:16031")
+// if err != nil {
+// t.Fatal(err)
+// }
+// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res)
+//
+// cl, err := rs.Client()
+// assert.NoError(t, err)
+//
+// r := ""
+// assert.NoError(t, cl.Call("http.Reset", true, &r))
+// assert.Equal(t, "OK", r)
+//
+// res2, _, err := get("http://localhost:16031")
+// if err != nil {
+// t.Fatal(err)
+// }
+// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res2)
+// assert.NotEqual(t, res, res2)
+// c.Stop()
+//}
+//
+//func Test_RPC_Unix(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(rpc.ID, &rpc.Service{})
+// c.Register(ID, &Service{})
+//
+// sock := `unix://` + os.TempDir() + `/rpc.unix`
+// j := json.ConfigCompatibleWithStandardLibrary
+// data, _ := j.Marshal(sock)
+//
+// assert.NoError(t, c.Init(&testCfg{
+// rpcCfg: `{"enable":true, "listen":` + string(data) + `}`,
+// httpCfg: `{
+// "enable": true,
+// "address": ":6032",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php pid pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`}))
+//
+// s, _ := c.Get(ID)
+// ss := s.(*Service)
+//
+// s2, _ := c.Get(rpc.ID)
+// rs := s2.(*rpc.Service)
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 500)
+//
+// res, _, err := get("http://localhost:6032")
+// if err != nil {
+// c.Stop()
+// t.Fatal(err)
+// }
+// if ss.pool.Workers() != nil && len(ss.pool.Workers()) > 0 {
+// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res)
+// } else {
+// c.Stop()
+// t.Fatal("no workers initialized")
+// }
+//
+// cl, err := rs.Client()
+// if err != nil {
+// c.Stop()
+// t.Fatal(err)
+// }
+//
+// r := ""
+// assert.NoError(t, cl.Call("http.Reset", true, &r))
+// assert.Equal(t, "OK", r)
+//
+// res2, _, err := get("http://localhost:6032")
+// if err != nil {
+// c.Stop()
+// t.Fatal(err)
+// }
+// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res2)
+// assert.NotEqual(t, res, res2)
+// c.Stop()
+//}
+//
+//func Test_Workers(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(rpc.ID, &rpc.Service{})
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{
+// rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`,
+// httpCfg: `{
+// "enable": true,
+// "address": ":6033",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php pid pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`}))
+//
+// s, _ := c.Get(ID)
+// ss := s.(*Service)
+//
+// s2, _ := c.Get(rpc.ID)
+// rs := s2.(*rpc.Service)
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// cl, err := rs.Client()
+// assert.NoError(t, err)
+//
+// r := &WorkerList{}
+// assert.NoError(t, cl.Call("http.Workers", true, &r))
+// assert.Len(t, r.Workers, 1)
+//
+// assert.Equal(t, *ss.pool.Workers()[0].Pid, r.Workers[0].Pid)
+// c.Stop()
+//}
+//
+//func Test_Errors(t *testing.T) {
+// r := &rpcServer{nil}
+//
+// assert.Error(t, r.Reset(true, nil))
+// assert.Error(t, r.Workers(true, nil))
+//}
diff --git a/plugins/http/ssl_test.go b/plugins/http/ssl_test.go
new file mode 100644
index 00000000..df09aef5
--- /dev/null
+++ b/plugins/http/ssl_test.go
@@ -0,0 +1,255 @@
+package http
+
+//
+//import (
+// "crypto/tls"
+// "github.com/sirupsen/logrus"
+// "github.com/sirupsen/logrus/hooks/test"
+// "github.com/spiral/roadrunner/service"
+// "github.com/stretchr/testify/assert"
+// "io/ioutil"
+// "net/http"
+// "testing"
+// "time"
+//)
+//
+//var sslClient = &http.Client{
+// Transport: &http.Transport{
+// TLSClientConfig: &tls.Config{
+// InsecureSkipVerify: true,
+// },
+// },
+//}
+//
+//func Test_SSL_Service_Echo(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+// "address": ":6029",
+// "ssl": {
+// "port": 6900,
+// "key": "fixtures/server.key",
+// "cert": "fixtures/server.crt"
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "pool": {"numWorkers": 1}
+// }
+// }`}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "https://localhost:6900?hello=world", nil)
+// assert.NoError(t, err)
+//
+// r, err := sslClient.Do(req)
+// assert.NoError(t, err)
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+// err2 := r.Body.Close()
+// if err2 != nil {
+// t.Errorf("fail to close the Body: error %v", err2)
+// }
+//
+// c.Stop()
+//}
+//
+//func Test_SSL_Service_NoRedirect(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+// "address": ":6030",
+// "ssl": {
+// "port": 6901,
+// "key": "fixtures/server.key",
+// "cert": "fixtures/server.crt"
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "pool": {"numWorkers": 1}
+// }
+// }`}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil)
+// assert.NoError(t, err)
+//
+// r, err := sslClient.Do(req)
+// assert.NoError(t, err)
+//
+// assert.Nil(t, r.TLS)
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+// err2 := r.Body.Close()
+// if err2 != nil {
+// t.Errorf("fail to close the Body: error %v", err2)
+// }
+// c.Stop()
+//}
+//
+//func Test_SSL_Service_Redirect(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+// "address": ":6831",
+// "ssl": {
+// "port": 6902,
+// "redirect": true,
+// "key": "fixtures/server.key",
+// "cert": "fixtures/server.crt"
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "pool": {"numWorkers": 1}
+// }
+// }`}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "http://localhost:6831?hello=world", nil)
+// assert.NoError(t, err)
+//
+// r, err := sslClient.Do(req)
+// assert.NoError(t, err)
+// assert.NotNil(t, r.TLS)
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+// err2 := r.Body.Close()
+// if err2 != nil {
+// t.Errorf("fail to close the Body: error %v", err2)
+// }
+// c.Stop()
+//}
+//
+//func Test_SSL_Service_Push(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+// "address": ":6032",
+// "ssl": {
+// "port": 6903,
+// "redirect": true,
+// "key": "fixtures/server.key",
+// "cert": "fixtures/server.crt"
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php push pipes",
+// "pool": {"numWorkers": 1}
+// }
+// }`}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "https://localhost:6903?hello=world", nil)
+// assert.NoError(t, err)
+//
+// r, err := sslClient.Do(req)
+// assert.NoError(t, err)
+//
+// assert.NotNil(t, r.TLS)
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.Equal(t, "", r.Header.Get("Http2-Push"))
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+//
+// err2 := r.Body.Close()
+// if err2 != nil {
+// t.Errorf("fail to close the Body: error %v", err2)
+// }
+// c.Stop()
+//}
diff --git a/plugins/http/test/.rr-http.yaml b/plugins/http/test/.rr-http.yaml
new file mode 100644
index 00000000..6fbfd378
--- /dev/null
+++ b/plugins/http/test/.rr-http.yaml
@@ -0,0 +1,37 @@
+server:
+ command: "php psr-worker.php"
+ user: ""
+ group: ""
+ env:
+ "RR_HTTP": "true"
+ relay: "pipes"
+ relayTimeout: "20s"
+
+http:
+ debug: true
+ address: 0.0.0.0:8080
+ maxRequestSize: 200
+ middleware: [ "" ]
+ uploads:
+ forbid: [ ".php", ".exe", ".bat" ]
+ trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ numWorkers: 4
+ maxJobs: 0
+ allocateTimeout: 60s
+ destroyTimeout: 60s
+
+ # ssl:
+ # port: 443
+ # redirect: true
+ # cert: server.crt
+ # key: server.key
+ # rootCa: root.crt
+ fcgi:
+ address: tcp://0.0.0.0:6920
+ http2:
+ enabled: false
+ h2c: false
+ maxConcurrentStreams: 128
+
+
diff --git a/plugins/http/test/http_test.go b/plugins/http/test/http_test.go
new file mode 100644
index 00000000..07925d33
--- /dev/null
+++ b/plugins/http/test/http_test.go
@@ -0,0 +1,72 @@
+package test
+
+import (
+ "os"
+ "os/signal"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ //rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestHTTPInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.Visualize(endure.StdOut, ""))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: ".rr-http.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ //&rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &http.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ tt := time.NewTimer(time.Minute * 3)
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+}
diff --git a/plugins/http/test/psr-worker.php b/plugins/http/test/psr-worker.php
new file mode 100644
index 00000000..65fc6bde
--- /dev/null
+++ b/plugins/http/test/psr-worker.php
@@ -0,0 +1,23 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+ini_set('display_errors', 'stderr');
+require dirname(__DIR__) . "/../../vendor_php/autoload.php";
+
+$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$psr7 = new RoadRunner\PSR7Client($worker);
+
+while ($req = $psr7->acceptRequest()) {
+ try {
+ $resp = new \Zend\Diactoros\Response();
+ $resp->getBody()->write("hello world");
+
+ $psr7->respond($resp);
+ } catch (\Throwable $e) {
+ $psr7->getWorker()->error((string)$e);
+ }
+} \ No newline at end of file
diff --git a/plugins/http/uploads.go b/plugins/http/uploads.go
new file mode 100644
index 00000000..2a1524ef
--- /dev/null
+++ b/plugins/http/uploads.go
@@ -0,0 +1,160 @@
+package http
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+
+ "io"
+ "io/ioutil"
+ "mime/multipart"
+ "os"
+ "sync"
+)
+
+const (
+ // UploadErrorOK - no error, the file uploaded with success.
+ UploadErrorOK = 0
+
+ // UploadErrorNoFile - no file was uploaded.
+ UploadErrorNoFile = 4
+
+ // UploadErrorNoTmpDir - missing a temporary folder.
+ UploadErrorNoTmpDir = 5
+
+ // UploadErrorCantWrite - failed to write file to disk.
+ UploadErrorCantWrite = 6
+
+ // UploadErrorExtension - forbidden file extension.
+ UploadErrorExtension = 7
+)
+
+// Uploads tree manages uploaded files tree and temporary files.
+type Uploads struct {
+ // associated temp directory and forbidden extensions.
+ cfg *UploadsConfig
+
+ // pre processed data tree for Uploads.
+ tree fileTree
+
+ // flat list of all file Uploads.
+ list []*FileUpload
+}
+
+// MarshalJSON marshal tree tree into JSON.
+func (u *Uploads) MarshalJSON() ([]byte, error) {
+ j := json.ConfigCompatibleWithStandardLibrary
+ return j.Marshal(u.tree)
+}
+
+// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
+// will be handled individually.
+func (u *Uploads) Open(log log.Logger) {
+ var wg sync.WaitGroup
+ for _, f := range u.list {
+ wg.Add(1)
+ go func(f *FileUpload) {
+ defer wg.Done()
+ err := f.Open(u.cfg)
+ if err != nil && log != nil {
+ log.Error("error opening the file", "err", err)
+ }
+ }(f)
+ }
+
+ wg.Wait()
+}
+
+// Clear deletes all temporary files.
+func (u *Uploads) Clear(log log.Logger) {
+ for _, f := range u.list {
+ if f.TempFilename != "" && exists(f.TempFilename) {
+ err := os.Remove(f.TempFilename)
+ if err != nil && log != nil {
+ log.Error("error removing the file", "err", err)
+ }
+ }
+ }
+}
+
+// FileUpload represents singular file NewUpload.
+type FileUpload struct {
+ // ID contains filename specified by the client.
+ Name string `json:"name"`
+
+ // Mime contains mime-type provided by the client.
+ Mime string `json:"mime"`
+
+ // Size of the uploaded file.
+ Size int64 `json:"size"`
+
+ // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php
+ Error int `json:"error"`
+
+ // TempFilename points to temporary file location.
+ TempFilename string `json:"tmpName"`
+
+ // associated file header
+ header *multipart.FileHeader
+}
+
+// NewUpload wraps net/http upload into PRS-7 compatible structure.
+func NewUpload(f *multipart.FileHeader) *FileUpload {
+ return &FileUpload{
+ Name: f.Filename,
+ Mime: f.Header.Get("Content-Type"),
+ Error: UploadErrorOK,
+ header: f,
+ }
+}
+
+// Open moves file content into temporary file available for PHP.
+// NOTE:
+// There is 2 deferred functions, and in case of getting 2 errors from both functions
+// error from close of temp file would be overwritten by error from the main file
+// STACK
+// DEFER FILE CLOSE (2)
+// DEFER TMP CLOSE (1)
+func (f *FileUpload) Open(cfg *UploadsConfig) (err error) {
+ if cfg.Forbids(f.Name) {
+ f.Error = UploadErrorExtension
+ return nil
+ }
+
+ file, err := f.header.Open()
+ if err != nil {
+ f.Error = UploadErrorNoFile
+ return err
+ }
+
+ defer func() {
+ // close the main file
+ err = file.Close()
+ }()
+
+ tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
+ if err != nil {
+ // most likely cause of this issue is missing tmp dir
+ f.Error = UploadErrorNoTmpDir
+ return err
+ }
+
+ f.TempFilename = tmp.Name()
+ defer func() {
+ // close the temp file
+ err = tmp.Close()
+ }()
+
+ if f.Size, err = io.Copy(tmp, file); err != nil {
+ f.Error = UploadErrorCantWrite
+ }
+
+ return err
+}
+
+// exists if file exists.
+func exists(path string) bool {
+ if _, err := os.Stat(path); os.IsNotExist(err) {
+ return false
+ }
+ return true
+}
diff --git a/plugins/http/uploads_config.go b/plugins/http/uploads_config.go
new file mode 100644
index 00000000..3f655064
--- /dev/null
+++ b/plugins/http/uploads_config.go
@@ -0,0 +1,45 @@
+package http
+
+import (
+ "os"
+ "path"
+ "strings"
+)
+
+// UploadsConfig describes file location and controls access to them.
+type UploadsConfig struct {
+ // Dir contains name of directory to control access to.
+ Dir string
+
+ // Forbid specifies list of file extensions which are forbidden for access.
+ // Example: .php, .exe, .bat, .htaccess and etc.
+ Forbid []string
+}
+
+// InitDefaults sets missing values to their default values.
+func (cfg *UploadsConfig) InitDefaults() error {
+ cfg.Forbid = []string{".php", ".exe", ".bat"}
+ return nil
+}
+
+// TmpDir returns temporary directory.
+func (cfg *UploadsConfig) TmpDir() string {
+ if cfg.Dir != "" {
+ return cfg.Dir
+ }
+
+ return os.TempDir()
+}
+
+// Forbids must return true if file extension is not allowed for the upload.
+func (cfg *UploadsConfig) Forbids(filename string) bool {
+ ext := strings.ToLower(path.Ext(filename))
+
+ for _, v := range cfg.Forbid {
+ if ext == v {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/plugins/http/uploads_config_test.go b/plugins/http/uploads_config_test.go
new file mode 100644
index 00000000..2b6ceebc
--- /dev/null
+++ b/plugins/http/uploads_config_test.go
@@ -0,0 +1,24 @@
+package http
+
+import (
+ "github.com/stretchr/testify/assert"
+ "os"
+ "testing"
+)
+
+func TestFsConfig_Forbids(t *testing.T) {
+ cfg := UploadsConfig{Forbid: []string{".php"}}
+
+ assert.True(t, cfg.Forbids("index.php"))
+ assert.True(t, cfg.Forbids("index.PHP"))
+ assert.True(t, cfg.Forbids("phpadmin/index.bak.php"))
+ assert.False(t, cfg.Forbids("index.html"))
+}
+
+func TestFsConfig_TmpFallback(t *testing.T) {
+ cfg := UploadsConfig{Dir: "test"}
+ assert.Equal(t, "test", cfg.TmpDir())
+
+ cfg = UploadsConfig{Dir: ""}
+ assert.Equal(t, os.TempDir(), cfg.TmpDir())
+}
diff --git a/plugins/http/uploads_test.go b/plugins/http/uploads_test.go
new file mode 100644
index 00000000..b023b28f
--- /dev/null
+++ b/plugins/http/uploads_test.go
@@ -0,0 +1,435 @@
+package http
+
+//
+//import (
+// "bytes"
+// "context"
+// "crypto/md5"
+// "encoding/hex"
+// "fmt"
+// "io"
+// "io/ioutil"
+// "mime/multipart"
+// "net/http"
+// "os"
+// "testing"
+// "time"
+//
+// json "github.com/json-iterator/go"
+// "github.com/stretchr/testify/assert"
+//)
+//
+//func TestHandler_Upload_File(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php upload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8021", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+//
+// f := mustOpen("uploads_test.go")
+// defer func() {
+// err := f.Close()
+// if err != nil {
+// t.Errorf("failed to close a file: error %v", err)
+// }
+// }()
+// fw, err := w.CreateFormFile("upload", f.Name())
+// assert.NotNil(t, fw)
+// assert.NoError(t, err)
+// _, err = io.Copy(fw, f)
+// if err != nil {
+// t.Errorf("error copying the file: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the file: error %v", err)
+// }
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error closing the Body: error %v", err)
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// fs := fileString("uploads_test.go", 0, "application/octet-stream")
+//
+// assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+//}
+//
+//func TestHandler_Upload_NestedFile(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php upload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8021", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+//
+// f := mustOpen("uploads_test.go")
+// defer func() {
+// err := f.Close()
+// if err != nil {
+// t.Errorf("failed to close a file: error %v", err)
+// }
+// }()
+// fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name())
+// assert.NotNil(t, fw)
+// assert.NoError(t, err)
+// _, err = io.Copy(fw, f)
+// if err != nil {
+// t.Errorf("error copying the file: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the file: error %v", err)
+// }
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error closing the Body: error %v", err)
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// fs := fileString("uploads_test.go", 0, "application/octet-stream")
+//
+// assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b))
+//}
+//
+//func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: "-----",
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php upload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8021", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+//
+// f := mustOpen("uploads_test.go")
+// defer func() {
+// err := f.Close()
+// if err != nil {
+// t.Errorf("failed to close a file: error %v", err)
+// }
+// }()
+// fw, err := w.CreateFormFile("upload", f.Name())
+// assert.NotNil(t, fw)
+// assert.NoError(t, err)
+// _, err = io.Copy(fw, f)
+// if err != nil {
+// t.Errorf("error copying the file: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the file: error %v", err)
+// }
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error closing the Body: error %v", err)
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// fs := fileString("uploads_test.go", 5, "application/octet-stream")
+//
+// assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+//}
+//
+//func TestHandler_Upload_File_Forbids(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php upload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8021", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+//
+// f := mustOpen("uploads_test.go")
+// defer func() {
+// err := f.Close()
+// if err != nil {
+// t.Errorf("failed to close a file: error %v", err)
+// }
+// }()
+// fw, err := w.CreateFormFile("upload", f.Name())
+// assert.NotNil(t, fw)
+// assert.NoError(t, err)
+// _, err = io.Copy(fw, f)
+// if err != nil {
+// t.Errorf("error copying the file: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the file: error %v", err)
+// }
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error closing the Body: error %v", err)
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// fs := fileString("uploads_test.go", 7, "application/octet-stream")
+//
+// assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+//}
+//
+//func Test_FileExists(t *testing.T) {
+// assert.True(t, exists("uploads_test.go"))
+// assert.False(t, exists("uploads_test."))
+//}
+//
+//func mustOpen(f string) *os.File {
+// r, err := os.Open(f)
+// if err != nil {
+// panic(err)
+// }
+// return r
+//}
+//
+//type fInfo struct {
+// Name string `json:"name"`
+// Size int64 `json:"size"`
+// Mime string `json:"mime"`
+// Error int `json:"error"`
+// MD5 string `json:"md5,omitempty"`
+//}
+//
+//func fileString(f string, errNo int, mime string) string {
+// s, err := os.Stat(f)
+// if err != nil {
+// fmt.Println(fmt.Errorf("error stat the file, error: %v", err))
+// }
+//
+// ff, err := os.Open(f)
+// if err != nil {
+// fmt.Println(fmt.Errorf("error opening the file, error: %v", err))
+// }
+//
+// defer func() {
+// er := ff.Close()
+// if er != nil {
+// fmt.Println(fmt.Errorf("error closing the file, error: %v", er))
+// }
+// }()
+//
+// h := md5.New()
+// _, err = io.Copy(h, ff)
+// if err != nil {
+// fmt.Println(fmt.Errorf("error copying the file, error: %v", err))
+// }
+//
+// v := &fInfo{
+// Name: s.Name(),
+// Size: s.Size(),
+// Error: errNo,
+// Mime: mime,
+// MD5: hex.EncodeToString(h.Sum(nil)),
+// }
+//
+// if errNo != 0 {
+// v.MD5 = ""
+// v.Size = 0
+// }
+//
+// j := json.ConfigCompatibleWithStandardLibrary
+// r, err := j.Marshal(v)
+// if err != nil {
+// fmt.Println(fmt.Errorf("error marshalling fInfo, error: %v", err))
+// }
+// return string(r)
+//
+//}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index e096708a..4d606390 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -25,49 +25,47 @@ type Plugin struct {
}
// Init application provider.
-func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+func (server *Plugin) Init(cfg config.Configurer, log log.Logger) error {
const op = errors.Op("Init")
- err := cfg.UnmarshalKey(ServiceName, &app.cfg)
+ err := cfg.UnmarshalKey(ServiceName, &server.cfg)
if err != nil {
return errors.E(op, errors.Init, err)
}
- app.cfg.InitDefaults()
- app.log = log
+ server.cfg.InitDefaults()
+ server.log = log
+
+ server.factory, err = server.initFactory()
+ if err != nil {
+ return errors.E(errors.Op("Init factory"), err)
+ }
return nil
}
// Name contains service name.
-func (app *Plugin) Name() string {
+func (server *Plugin) Name() string {
return ServiceName
}
-func (app *Plugin) Serve() chan error {
+func (server *Plugin) Serve() chan error {
errCh := make(chan error, 1)
- var err error
-
- app.factory, err = app.initFactory()
- if err != nil {
- errCh <- errors.E(errors.Op("init factory"), err)
- }
-
return errCh
}
-func (app *Plugin) Stop() error {
- if app.factory == nil {
+func (server *Plugin) Stop() error {
+ if server.factory == nil {
return nil
}
- return app.factory.Close(context.Background())
+ return server.factory.Close(context.Background())
}
// CmdFactory provides worker command factory assocated with given context.
-func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) {
+func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) {
var cmdArgs []string
// create command according to the config
- cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...)
+ cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...)
return func() *exec.Cmd {
cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
@@ -75,67 +73,67 @@ func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) {
// if user is not empty, and OS is linux or macos
// execute php worker from that particular user
- if app.cfg.User != "" {
- err := util.ExecuteFromUser(cmd, app.cfg.User)
+ if server.cfg.User != "" {
+ err := util.ExecuteFromUser(cmd, server.cfg.User)
if err != nil {
return nil
}
}
- cmd.Env = app.setEnv(env)
+ cmd.Env = server.setEnv(env)
return cmd
}, nil
}
// NewWorker issues new standalone worker.
-func (app *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) {
+func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) {
const op = errors.Op("new worker")
- spawnCmd, err := app.CmdFactory(env)
+ spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, errors.E(op, err)
}
- w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
+ w, err := server.factory.SpawnWorkerWithContext(ctx, spawnCmd())
if err != nil {
return nil, errors.E(op, err)
}
- w.AddListener(app.collectLogs)
+ w.AddListener(server.collectLogs)
return w, nil
}
// NewWorkerPool issues new worker pool.
-func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) {
- spawnCmd, err := app.CmdFactory(env)
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) {
+ spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, err
}
- p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt)
+ p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt)
if err != nil {
return nil, err
}
- p.AddListener(app.collectLogs)
+ p.AddListener(server.collectLogs)
return p, nil
}
// creates relay and worker factory.
-func (app *Plugin) initFactory() (roadrunner.Factory, error) {
+func (server *Plugin) initFactory() (roadrunner.Factory, error) {
const op = errors.Op("network factory init")
- if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
+ if server.cfg.Relay == "" || server.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
}
- dsn := strings.Split(app.cfg.Relay, "://")
+ dsn := strings.Split(server.cfg.Relay, "://")
if len(dsn) != 2 {
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
- lsn, err := util.CreateListener(app.cfg.Relay)
+ lsn, err := util.CreateListener(server.cfg.Relay)
if err != nil {
return nil, errors.E(op, errors.Network, err)
}
@@ -143,16 +141,16 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) {
switch dsn[0] {
// sockets group
case "unix":
- return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
+ return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
case "tcp":
- return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
+ return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
default:
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
}
-func (app *Plugin) setEnv(e server.Env) []string {
- env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay))
+func (server *Plugin) setEnv(e server.Env) []string {
+ env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay))
for k, v := range e {
env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
}
@@ -160,13 +158,13 @@ func (app *Plugin) setEnv(e server.Env) []string {
return env
}
-func (app *Plugin) collectLogs(event interface{}) {
+func (server *Plugin) collectLogs(event interface{}) {
if we, ok := event.(roadrunner.WorkerEvent); ok {
switch we.Event {
case roadrunner.EventWorkerError:
- app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
+ server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
case roadrunner.EventWorkerLog:
- app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
+ server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
}
}
}
diff --git a/src/Diactoros/ServerRequestFactory.php b/src/Diactoros/ServerRequestFactory.php
new file mode 100644
index 00000000..3fcf8e29
--- /dev/null
+++ b/src/Diactoros/ServerRequestFactory.php
@@ -0,0 +1,26 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Wolfy-J
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Diactoros;
+
+use Psr\Http\Message\ServerRequestFactoryInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use Zend\Diactoros\ServerRequest;
+
+final class ServerRequestFactory implements ServerRequestFactoryInterface
+{
+ /**
+ * @inheritdoc
+ */
+ public function createServerRequest(string $method, $uri, array $serverParams = []): ServerRequestInterface
+ {
+ $uploadedFiles = [];
+ return new ServerRequest($serverParams, $uploadedFiles, $uri, $method);
+ }
+}
diff --git a/src/Diactoros/StreamFactory.php b/src/Diactoros/StreamFactory.php
new file mode 100644
index 00000000..cc0a5306
--- /dev/null
+++ b/src/Diactoros/StreamFactory.php
@@ -0,0 +1,57 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Wolfy-J
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Diactoros;
+
+use RuntimeException;
+use Psr\Http\Message\StreamFactoryInterface;
+use Psr\Http\Message\StreamInterface;
+use Zend\Diactoros\Stream;
+
+final class StreamFactory implements StreamFactoryInterface
+{
+ /**
+ * @inheritdoc
+ * @throws RuntimeException
+ */
+ public function createStream(string $content = ''): StreamInterface
+ {
+ $resource = fopen('php://temp', 'rb+');
+
+ if (! \is_resource($resource)) {
+ throw new RuntimeException('Cannot create stream');
+ }
+
+ fwrite($resource, $content);
+ rewind($resource);
+ return $this->createStreamFromResource($resource);
+ }
+
+ /**
+ * @inheritdoc
+ */
+ public function createStreamFromFile(string $file, string $mode = 'rb'): StreamInterface
+ {
+ $resource = fopen($file, $mode);
+
+ if (! \is_resource($resource)) {
+ throw new RuntimeException('Cannot create stream');
+ }
+
+ return $this->createStreamFromResource($resource);
+ }
+
+ /**
+ * @inheritdoc
+ */
+ public function createStreamFromResource($resource): StreamInterface
+ {
+ return new Stream($resource);
+ }
+}
diff --git a/src/Diactoros/UploadedFileFactory.php b/src/Diactoros/UploadedFileFactory.php
new file mode 100644
index 00000000..45773287
--- /dev/null
+++ b/src/Diactoros/UploadedFileFactory.php
@@ -0,0 +1,36 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Wolfy-J
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Diactoros;
+
+use Psr\Http\Message\StreamInterface;
+use Psr\Http\Message\UploadedFileFactoryInterface;
+use Psr\Http\Message\UploadedFileInterface;
+use Zend\Diactoros\UploadedFile;
+
+final class UploadedFileFactory implements UploadedFileFactoryInterface
+{
+ /**
+ * @inheritdoc
+ */
+ public function createUploadedFile(
+ StreamInterface $stream,
+ int $size = null,
+ int $error = \UPLOAD_ERR_OK,
+ string $clientFilename = null,
+ string $clientMediaType = null
+ ): UploadedFileInterface {
+ if ($size === null) {
+ $size = (int) $stream->getSize();
+ }
+
+ /** @var resource $stream */
+ return new UploadedFile($stream, $size, $error, $clientFilename, $clientMediaType);
+ }
+}
diff --git a/src/Exception/MetricException.php b/src/Exception/MetricException.php
index d5b738b8..d5b738b8 100755..100644
--- a/src/Exception/MetricException.php
+++ b/src/Exception/MetricException.php
diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php
index cd657502..cd657502 100755..100644
--- a/src/Exception/RoadRunnerException.php
+++ b/src/Exception/RoadRunnerException.php
diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php
new file mode 100644
index 00000000..43967893
--- /dev/null
+++ b/src/Exceptions/RoadRunnerException.php
@@ -0,0 +1,18 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Exceptions;
+
+/**
+ * @deprecated use \Spiral\RoadRunner\Exception\RoadRunnerException instead
+ */
+class RoadRunnerException extends \RuntimeException
+{
+}
diff --git a/src/HttpClient.php b/src/HttpClient.php
new file mode 100644
index 00000000..4ca152c8
--- /dev/null
+++ b/src/HttpClient.php
@@ -0,0 +1,75 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Alex Bond
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+final class HttpClient
+{
+ /** @var Worker */
+ private $worker;
+
+ /**
+ * @param Worker $worker
+ */
+ public function __construct(Worker $worker)
+ {
+ $this->worker = $worker;
+ }
+
+ /**
+ * @return Worker
+ */
+ public function getWorker(): Worker
+ {
+ return $this->worker;
+ }
+
+ /**
+ * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string]
+ * or null if termination request or invalid context.
+ */
+ public function acceptRequest(): ?array
+ {
+ $body = $this->getWorker()->receive($ctx);
+ if (empty($body) && empty($ctx)) {
+ // termination request
+ return null;
+ }
+
+ $ctx = json_decode($ctx, true);
+ if ($ctx === null) {
+ // invalid context
+ return null;
+ }
+
+ return ['ctx' => $ctx, 'body' => $body];
+ }
+
+ /**
+ * Send response to the application server.
+ *
+ * @param int $status Http status code
+ * @param string $body Body of response
+ * @param string[][] $headers An associative array of the message's headers. Each
+ * key MUST be a header name, and each value MUST be an array of strings
+ * for that header.
+ */
+ public function respond(int $status, string $body, array $headers = []): void
+ {
+ if (empty($headers)) {
+ // this is required to represent empty header set as map and not as array
+ $headers = new \stdClass();
+ }
+
+ $this->getWorker()->send(
+ $body,
+ (string) json_encode(['status' => $status, 'headers' => $headers])
+ );
+ }
+}
diff --git a/src/Metrics.php b/src/Metrics.php
new file mode 100644
index 00000000..d6b6e1da
--- /dev/null
+++ b/src/Metrics.php
@@ -0,0 +1,80 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\Goridge\Exceptions\RPCException;
+use Spiral\Goridge\RPC;
+use Spiral\RoadRunner\Exception\MetricException;
+
+/**
+ * Application metrics.
+ */
+final class Metrics implements MetricsInterface
+{
+ /** @var RPC */
+ private $rpc;
+
+ /**
+ * @param RPC $rpc
+ */
+ public function __construct(RPC $rpc)
+ {
+ $this->rpc = $rpc;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function add(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Add', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sub(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function observe(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function set(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Set', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+}
diff --git a/src/MetricsInterface.php b/src/MetricsInterface.php
new file mode 100644
index 00000000..ec2009b0
--- /dev/null
+++ b/src/MetricsInterface.php
@@ -0,0 +1,64 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\RoadRunner\Exception\MetricException;
+
+interface MetricsInterface
+{
+ /**
+ * Add collector value. Fallback to appropriate method of related collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function add(string $collector, float $value, array $labels = []);
+
+ /**
+ * Subtract the collector value, only for gauge collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function sub(string $collector, float $value, array $labels = []);
+
+ /**
+ * Observe collector value, only for histogram and summary collectors.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function observe(string $collector, float $value, array $labels = []);
+
+ /**
+ * Set collector value, only for gauge collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function set(string $collector, float $value, array $labels = []);
+}
diff --git a/src/PSR7Client.php b/src/PSR7Client.php
new file mode 100644
index 00000000..777dd891
--- /dev/null
+++ b/src/PSR7Client.php
@@ -0,0 +1,217 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Wolfy-J
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestFactoryInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use Psr\Http\Message\StreamFactoryInterface;
+use Psr\Http\Message\UploadedFileFactoryInterface;
+use Psr\Http\Message\UploadedFileInterface;
+
+/**
+ * Manages PSR-7 request and response.
+ */
+class PSR7Client
+{
+ /** @var HttpClient */
+ private $httpClient;
+
+ /** @var ServerRequestFactoryInterface */
+ private $requestFactory;
+
+ /** @var StreamFactoryInterface */
+ private $streamFactory;
+
+ /** @var UploadedFileFactoryInterface */
+ private $uploadsFactory;
+
+ /** @var mixed[] */
+ private $originalServer = [];
+
+ /** @var string[] Valid values for HTTP protocol version */
+ private static $allowedVersions = ['1.0', '1.1', '2',];
+
+ /**
+ * @param Worker $worker
+ * @param ServerRequestFactoryInterface|null $requestFactory
+ * @param StreamFactoryInterface|null $streamFactory
+ * @param UploadedFileFactoryInterface|null $uploadsFactory
+ */
+ public function __construct(
+ Worker $worker,
+ ServerRequestFactoryInterface $requestFactory = null,
+ StreamFactoryInterface $streamFactory = null,
+ UploadedFileFactoryInterface $uploadsFactory = null
+ ) {
+ $this->httpClient = new HttpClient($worker);
+ $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory();
+ $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory();
+ $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory();
+ $this->originalServer = $_SERVER;
+ }
+
+ /**
+ * @return Worker
+ */
+ public function getWorker(): Worker
+ {
+ return $this->httpClient->getWorker();
+ }
+
+ /**
+ * @return ServerRequestInterface|null
+ */
+ public function acceptRequest(): ?ServerRequestInterface
+ {
+ $rawRequest = $this->httpClient->acceptRequest();
+ if ($rawRequest === null) {
+ return null;
+ }
+
+ $_SERVER = $this->configureServer($rawRequest['ctx']);
+
+ $request = $this->requestFactory->createServerRequest(
+ $rawRequest['ctx']['method'],
+ $rawRequest['ctx']['uri'],
+ $_SERVER
+ );
+
+ parse_str($rawRequest['ctx']['rawQuery'], $query);
+
+ $request = $request
+ ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol']))
+ ->withCookieParams($rawRequest['ctx']['cookies'])
+ ->withQueryParams($query)
+ ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads']));
+
+ foreach ($rawRequest['ctx']['attributes'] as $name => $value) {
+ $request = $request->withAttribute($name, $value);
+ }
+
+ foreach ($rawRequest['ctx']['headers'] as $name => $value) {
+ $request = $request->withHeader($name, $value);
+ }
+
+ if ($rawRequest['ctx']['parsed']) {
+ return $request->withParsedBody(json_decode($rawRequest['body'], true));
+ }
+
+ if ($rawRequest['body'] !== null) {
+ return $request->withBody($this->streamFactory->createStream($rawRequest['body']));
+ }
+
+ return $request;
+ }
+
+ /**
+ * Send response to the application server.
+ *
+ * @param ResponseInterface $response
+ */
+ public function respond(ResponseInterface $response): void
+ {
+ $this->httpClient->respond(
+ $response->getStatusCode(),
+ $response->getBody()->__toString(),
+ $response->getHeaders()
+ );
+ }
+
+ /**
+ * Returns altered copy of _SERVER variable. Sets ip-address,
+ * request-time and other values.
+ *
+ * @param mixed[] $ctx
+ * @return mixed[]
+ */
+ protected function configureServer(array $ctx): array
+ {
+ $server = $this->originalServer;
+
+ $server['REQUEST_URI'] = $ctx['uri'];
+ $server['REQUEST_TIME'] = time();
+ $server['REQUEST_TIME_FLOAT'] = microtime(true);
+ $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1';
+ $server['REQUEST_METHOD'] = $ctx['method'];
+
+ $server['HTTP_USER_AGENT'] = '';
+ foreach ($ctx['headers'] as $key => $value) {
+ $key = strtoupper(str_replace('-', '_', $key));
+ if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) {
+ $server[$key] = implode(', ', $value);
+ } else {
+ $server['HTTP_' . $key] = implode(', ', $value);
+ }
+ }
+
+ return $server;
+ }
+
+ /**
+ * Wraps all uploaded files with UploadedFile.
+ *
+ * @param array[] $files
+ *
+ * @return UploadedFileInterface[]|mixed[]
+ */
+ private function wrapUploads($files): array
+ {
+ if (empty($files)) {
+ return [];
+ }
+
+ $result = [];
+ foreach ($files as $index => $f) {
+ if (!isset($f['name'])) {
+ $result[$index] = $this->wrapUploads($f);
+ continue;
+ }
+
+ if (UPLOAD_ERR_OK === $f['error']) {
+ $stream = $this->streamFactory->createStreamFromFile($f['tmpName']);
+ } else {
+ $stream = $this->streamFactory->createStream();
+ }
+
+ $result[$index] = $this->uploadsFactory->createUploadedFile(
+ $stream,
+ $f['size'],
+ $f['error'],
+ $f['name'],
+ $f['mime']
+ );
+ }
+
+ return $result;
+ }
+
+ /**
+ * Normalize HTTP protocol version to valid values
+ *
+ * @param string $version
+ * @return string
+ */
+ private static function fetchProtocolVersion(string $version): string
+ {
+ $v = substr($version, 5);
+
+ if ($v === '2.0') {
+ return '2';
+ }
+
+ // Fallback for values outside of valid protocol versions
+ if (!in_array($v, static::$allowedVersions, true)) {
+ return '1.1';
+ }
+
+ return $v;
+ }
+}
diff --git a/src/Worker.php b/src/Worker.php
index d509562e..d509562e 100755..100644
--- a/src/Worker.php
+++ b/src/Worker.php
diff --git a/static_pool.go b/static_pool.go
index c1dacd8d..d5511018 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -54,6 +54,9 @@ type StaticPool struct {
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg PoolConfig, options ...PoolOptions) (Pool, error) {
const op = errors.Op("NewPool")
+ if factory == nil {
+ return nil, errors.E(op, errors.Str("no factory initialized"))
+ }
cfg.InitDefaults()
if cfg.Debug {
@@ -75,13 +78,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Poo
workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
// put stack in the pool
err = p.ww.AddToWatch(ctx, workers)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
p.errEncoder = defaultErrEncoder(p)