diff options
48 files changed, 7299 insertions, 163 deletions
diff --git a/composer.json b/composer.json index aef75b08..283eaab1 100755 --- a/composer.json +++ b/composer.json @@ -18,14 +18,15 @@ "ext-json": "*", "ext-curl": "*", "spiral/goridge": "^2.4.2", - "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0 || ^5.0.0" + "psr/http-factory": "^1.0", + "psr/http-message": "^1.0", + "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0 || ^5.0.0", + "laminas/laminas-diactoros": "^1.3 || ^2.0" }, "config": { "vendor-dir": "vendor_php" }, "require-dev": { - "psr/http-factory": "^1.0", - "psr/http-message": "^1.0", "phpstan/phpstan": "~0.12" }, "scripts": { diff --git a/composer.lock b/composer.lock index 183f9fef..ded194f5 100755 --- a/composer.lock +++ b/composer.lock @@ -4,9 +4,168 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "95535b37e4eb6476a2f89ea1b0f16e48", + "content-hash": "439018483d4d3a37c3d369d2587b8311", "packages": [ { + "name": "laminas/laminas-diactoros", + "version": "2.4.1", + "source": { + "type": "git", + "url": "https://github.com/laminas/laminas-diactoros.git", + "reference": "36ef09b73e884135d2059cc498c938e90821bb57" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/laminas/laminas-diactoros/zipball/36ef09b73e884135d2059cc498c938e90821bb57", + "reference": "36ef09b73e884135d2059cc498c938e90821bb57", + "shasum": "" + }, + "require": { + "laminas/laminas-zendframework-bridge": "^1.0", + "php": "^7.1", + "psr/http-factory": "^1.0", + "psr/http-message": "^1.0" + }, + "conflict": { + "phpspec/prophecy": "<1.9.0" + }, + "provide": { + "psr/http-factory-implementation": "1.0", + "psr/http-message-implementation": "1.0" + }, + "replace": { + "zendframework/zend-diactoros": "^2.2.1" + }, + "require-dev": { + "ext-curl": "*", + "ext-dom": "*", + "ext-gd": "*", + "ext-libxml": "*", + "http-interop/http-factory-tests": "^0.5.0", + "laminas/laminas-coding-standard": "~1.0.0", + "php-http/psr7-integration-tests": "^1.0", + "phpunit/phpunit": "^7.5.18" + }, + "type": "library", + "extra": { + "laminas": { + "config-provider": "Laminas\\Diactoros\\ConfigProvider", + "module": "Laminas\\Diactoros" + } + }, + "autoload": { + "files": [ + "src/functions/create_uploaded_file.php", + "src/functions/marshal_headers_from_sapi.php", + "src/functions/marshal_method_from_sapi.php", + "src/functions/marshal_protocol_version_from_sapi.php", + "src/functions/marshal_uri_from_sapi.php", + "src/functions/normalize_server.php", + "src/functions/normalize_uploaded_files.php", + "src/functions/parse_cookie_header.php", + "src/functions/create_uploaded_file.legacy.php", + "src/functions/marshal_headers_from_sapi.legacy.php", + "src/functions/marshal_method_from_sapi.legacy.php", + "src/functions/marshal_protocol_version_from_sapi.legacy.php", + "src/functions/marshal_uri_from_sapi.legacy.php", + "src/functions/normalize_server.legacy.php", + "src/functions/normalize_uploaded_files.legacy.php", + "src/functions/parse_cookie_header.legacy.php" + ], + "psr-4": { + "Laminas\\Diactoros\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "description": "PSR HTTP Message implementations", + "homepage": "https://laminas.dev", + "keywords": [ + "http", + "laminas", + "psr", + "psr-17", + "psr-7" + ], + "support": { + "chat": "https://laminas.dev/chat", + "docs": "https://docs.laminas.dev/laminas-diactoros/", + "forum": "https://discourse.laminas.dev", + "issues": "https://github.com/laminas/laminas-diactoros/issues", + "rss": "https://github.com/laminas/laminas-diactoros/releases.atom", + "source": "https://github.com/laminas/laminas-diactoros" + }, + "funding": [ + { + "url": "https://funding.communitybridge.org/projects/laminas-project", + "type": "community_bridge" + } + ], + "time": "2020-09-03T14:29:41+00:00" + }, + { + "name": "laminas/laminas-zendframework-bridge", + "version": "1.1.1", + "source": { + "type": "git", + "url": "https://github.com/laminas/laminas-zendframework-bridge.git", + "reference": "6ede70583e101030bcace4dcddd648f760ddf642" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/laminas/laminas-zendframework-bridge/zipball/6ede70583e101030bcace4dcddd648f760ddf642", + "reference": "6ede70583e101030bcace4dcddd648f760ddf642", + "shasum": "" + }, + "require": { + "php": "^5.6 || ^7.0 || ^8.0" + }, + "require-dev": { + "phpunit/phpunit": "^5.7 || ^6.5 || ^7.5 || ^8.1 || ^9.3", + "squizlabs/php_codesniffer": "^3.5" + }, + "type": "library", + "extra": { + "laminas": { + "module": "Laminas\\ZendFrameworkBridge" + } + }, + "autoload": { + "files": [ + "src/autoload.php" + ], + "psr-4": { + "Laminas\\ZendFrameworkBridge\\": "src//" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "description": "Alias legacy ZF class names to Laminas Project equivalents.", + "keywords": [ + "ZendFramework", + "autoloading", + "laminas", + "zf" + ], + "support": { + "forum": "https://discourse.laminas.dev/", + "issues": "https://github.com/laminas/laminas-zendframework-bridge/issues", + "rss": "https://github.com/laminas/laminas-zendframework-bridge/releases.atom", + "source": "https://github.com/laminas/laminas-zendframework-bridge" + }, + "funding": [ + { + "url": "https://funding.communitybridge.org/projects/laminas-project", + "type": "community_bridge" + } + ], + "time": "2020-09-14T14:23:00+00:00" + }, + { "name": "psr/container", "version": "1.0.0", "source": { @@ -60,6 +219,114 @@ "time": "2017-02-14T16:28:37+00:00" }, { + "name": "psr/http-factory", + "version": "1.0.1", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-factory.git", + "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-factory/zipball/12ac7fcd07e5b077433f5f2bee95b3a771bf61be", + "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be", + "shasum": "" + }, + "require": { + "php": ">=7.0.0", + "psr/http-message": "^1.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Message\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interfaces for PSR-7 HTTP message factories", + "keywords": [ + "factory", + "http", + "message", + "psr", + "psr-17", + "psr-7", + "request", + "response" + ], + "support": { + "source": "https://github.com/php-fig/http-factory/tree/master" + }, + "time": "2019-04-30T12:38:16+00:00" + }, + { + "name": "psr/http-message", + "version": "1.0.1", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-message.git", + "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-message/zipball/f6561bf28d520154e4b0ec72be95418abe6d9363", + "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363", + "shasum": "" + }, + "require": { + "php": ">=5.3.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Message\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interface for HTTP messages", + "homepage": "https://github.com/php-fig/http-message", + "keywords": [ + "http", + "http-message", + "psr", + "psr-7", + "request", + "response" + ], + "support": { + "source": "https://github.com/php-fig/http-message/tree/master" + }, + "time": "2016-08-06T14:39:51+00:00" + }, + { "name": "spiral/goridge", "version": "v2.4.5", "source": { @@ -851,16 +1118,16 @@ "packages-dev": [ { "name": "phpstan/phpstan", - "version": "0.12.53", + "version": "0.12.56", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "dbbdb0d7c2434ecd5289f6114d16473e694caa67" + "reference": "007fd5d700c41e1bb27795fae15a2383f8fa4ba1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/dbbdb0d7c2434ecd5289f6114d16473e694caa67", - "reference": "dbbdb0d7c2434ecd5289f6114d16473e694caa67", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/007fd5d700c41e1bb27795fae15a2383f8fa4ba1", + "reference": "007fd5d700c41e1bb27795fae15a2383f8fa4ba1", "shasum": "" }, "require": { @@ -891,7 +1158,7 @@ "description": "PHPStan - PHP Static Analysis Tool", "support": { "issues": "https://github.com/phpstan/phpstan/issues", - "source": "https://github.com/phpstan/phpstan/tree/0.12.53" + "source": "https://github.com/phpstan/phpstan/tree/0.12.56" }, "funding": [ { @@ -907,115 +1174,7 @@ "type": "tidelift" } ], - "time": "2020-11-01T14:51:50+00:00" - }, - { - "name": "psr/http-factory", - "version": "1.0.1", - "source": { - "type": "git", - "url": "https://github.com/php-fig/http-factory.git", - "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-fig/http-factory/zipball/12ac7fcd07e5b077433f5f2bee95b3a771bf61be", - "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be", - "shasum": "" - }, - "require": { - "php": ">=7.0.0", - "psr/http-message": "^1.0" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "1.0.x-dev" - } - }, - "autoload": { - "psr-4": { - "Psr\\Http\\Message\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "PHP-FIG", - "homepage": "http://www.php-fig.org/" - } - ], - "description": "Common interfaces for PSR-7 HTTP message factories", - "keywords": [ - "factory", - "http", - "message", - "psr", - "psr-17", - "psr-7", - "request", - "response" - ], - "support": { - "source": "https://github.com/php-fig/http-factory/tree/master" - }, - "time": "2019-04-30T12:38:16+00:00" - }, - { - "name": "psr/http-message", - "version": "1.0.1", - "source": { - "type": "git", - "url": "https://github.com/php-fig/http-message.git", - "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-fig/http-message/zipball/f6561bf28d520154e4b0ec72be95418abe6d9363", - "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363", - "shasum": "" - }, - "require": { - "php": ">=5.3.0" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "1.0.x-dev" - } - }, - "autoload": { - "psr-4": { - "Psr\\Http\\Message\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "PHP-FIG", - "homepage": "http://www.php-fig.org/" - } - ], - "description": "Common interface for HTTP messages", - "homepage": "https://github.com/php-fig/http-message", - "keywords": [ - "http", - "http-message", - "psr", - "psr-7", - "request", - "response" - ], - "support": { - "source": "https://github.com/php-fig/http-message/tree/master" - }, - "time": "2016-08-06T14:39:51+00:00" + "time": "2020-11-16T22:59:18+00:00" } ], "aliases": [], @@ -3,11 +3,14 @@ module github.com/spiral/roadrunner/v2 go 1.15 require ( + github.com/cenkalti/backoff/v4 v4.1.0 github.com/fatih/color v1.10.0 + github.com/hashicorp/go-multierror v1.0.0 github.com/json-iterator/go v1.1.10 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.7.1 github.com/shirou/gopsutil v3.20.10+incompatible + github.com/sirupsen/logrus v1.6.0 github.com/spf13/viper v1.7.1 github.com/spiral/endure v1.0.0-beta19 github.com/spiral/errors v1.0.4 @@ -16,6 +19,7 @@ require ( github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a go.uber.org/multierr v1.6.0 go.uber.org/zap v1.16.0 + golang.org/x/net v0.0.0-20200222125558-5a598a2470a0 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 -)
\ No newline at end of file +) @@ -115,10 +115,12 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= @@ -236,6 +238,7 @@ github.com/shirou/gopsutil v3.20.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMT github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -341,7 +344,9 @@ golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200222125558-5a598a2470a0 h1:MsuvTghUPjX762sGLnGsxC3HM0B5r83wEtYcYR8/vRs= golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/pipe_factory.go b/pipe_factory.go index 15f38e42..76a3780e 100755 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -11,8 +11,7 @@ import ( // PipeFactory connects to stack using standard // streams (STDIN, STDOUT pipes). -type PipeFactory struct { -} +type PipeFactory struct{} // NewPipeFactory returns new factory instance and starts // listening diff --git a/plugins/http/attributes/attributes.go b/plugins/http/attributes/attributes.go new file mode 100644 index 00000000..77d6ea69 --- /dev/null +++ b/plugins/http/attributes/attributes.go @@ -0,0 +1,76 @@ +package attributes + +import ( + "context" + "errors" + "net/http" +) + +type attrKey int + +const contextKey attrKey = iota + +type attrs map[string]interface{} + +func (v attrs) get(key string) interface{} { + if v == nil { + return "" + } + + return v[key] +} + +func (v attrs) set(key string, value interface{}) { + v[key] = value +} + +func (v attrs) del(key string) { + delete(v, key) +} + +// Init returns request with new context and attribute bag. +func Init(r *http.Request) *http.Request { + return r.WithContext(context.WithValue(r.Context(), contextKey, attrs{})) +} + +// All returns all context attributes. +func All(r *http.Request) map[string]interface{} { + v := r.Context().Value(contextKey) + if v == nil { + return attrs{} + } + + return v.(attrs) +} + +// Get gets the value from request context. It replaces any existing +// values. +func Get(r *http.Request, key string) interface{} { + v := r.Context().Value(contextKey) + if v == nil { + return nil + } + + return v.(attrs).get(key) +} + +// Set sets the key to value. It replaces any existing +// values. Context specific. +func Set(r *http.Request, key string, value interface{}) error { + v := r.Context().Value(contextKey) + if v == nil { + return errors.New("unable to find `psr:attributes` context key") + } + + v.(attrs).set(key, value) + return nil +} + +// Delete deletes values associated with attribute key. +func (v attrs) Delete(key string) { + if v == nil { + return + } + + v.del(key) +} diff --git a/plugins/http/attributes/attributes_test.go b/plugins/http/attributes/attributes_test.go new file mode 100644 index 00000000..a4c85eea --- /dev/null +++ b/plugins/http/attributes/attributes_test.go @@ -0,0 +1,79 @@ +package attributes + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAllAttributes(t *testing.T) { + r := &http.Request{} + r = Init(r) + + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } + + assert.Equal(t, All(r), map[string]interface{}{ + "key": "value", + }) +} + +func TestAllAttributesNone(t *testing.T) { + r := &http.Request{} + r = Init(r) + + assert.Equal(t, All(r), map[string]interface{}{}) +} + +func TestAllAttributesNone2(t *testing.T) { + r := &http.Request{} + + assert.Equal(t, All(r), map[string]interface{}{}) +} + +func TestGetAttribute(t *testing.T) { + r := &http.Request{} + r = Init(r) + + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } + assert.Equal(t, Get(r, "key"), "value") +} + +func TestGetAttributeNone(t *testing.T) { + r := &http.Request{} + r = Init(r) + + assert.Equal(t, Get(r, "key"), nil) +} + +func TestGetAttributeNone2(t *testing.T) { + r := &http.Request{} + + assert.Equal(t, Get(r, "key"), nil) +} + +func TestSetAttribute(t *testing.T) { + r := &http.Request{} + r = Init(r) + + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } + assert.Equal(t, Get(r, "key"), "value") +} + +func TestSetAttributeNone(t *testing.T) { + r := &http.Request{} + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } + assert.Equal(t, Get(r, "key"), nil) +} diff --git a/plugins/http/config.go b/plugins/http/config.go new file mode 100644 index 00000000..b3f4ca13 --- /dev/null +++ b/plugins/http/config.go @@ -0,0 +1,287 @@ +package http + +import ( + "errors" + "fmt" + "net" + "os" + "strings" + "time" + + "github.com/spiral/roadrunner/v2" +) + +type ServerConfig struct { + // Command includes command strings with all the parameters, example: "php worker.php pipes". + Command string + + // User under which process will be started + User string + + // Relay defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://pool.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. + RelayTimeout time.Duration + + // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change + // while server is running. + + + env map[string]string +} + +// Config configures RoadRunner HTTP server. +type Config struct { + // Port and port to handle as http server. + Address string + + // SSL defines https server options. + SSL SSLConfig + + // FCGI configuration. You can use FastCGI without HTTP server. + FCGI *FCGIConfig + + // HTTP2 configuration + HTTP2 *HTTP2Config + + // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited. + MaxRequestSize int64 + + // TrustedSubnets declare IP subnets which are allowed to set ip using X-Real-Ip and X-Forwarded-For + TrustedSubnets []string + cidrs []*net.IPNet + + // Uploads configures uploads configuration. + Uploads *UploadsConfig + + // Pool configures worker pool. + Pool *roadrunner.PoolConfig +} + +// FCGIConfig for FastCGI server. +type FCGIConfig struct { + // Address and port to handle as http server. + Address string +} + +// HTTP2Config HTTP/2 server customizations. +type HTTP2Config struct { + // Enable or disable HTTP/2 extension, default enable. + Enabled bool + + // H2C enables HTTP/2 over TCP + H2C bool + + // MaxConcurrentStreams defaults to 128. + MaxConcurrentStreams uint32 +} + +// InitDefaults sets default values for HTTP/2 configuration. +func (cfg *HTTP2Config) InitDefaults() error { + cfg.Enabled = true + cfg.MaxConcurrentStreams = 128 + + return nil +} + +// SSLConfig defines https server configuration. +type SSLConfig struct { + // Port to listen as HTTPS server, defaults to 443. + Port int + + // Redirect when enabled forces all http connections to switch to https. + Redirect bool + + // Key defined private server key. + Key string + + // Cert is https certificate. + Cert string + + // Root CA file + RootCA string +} + +// EnableHTTP is true when http server must run. +func (c *Config) EnableHTTP() bool { + return c.Address != "" +} + +// EnableTLS returns true if pool must listen TLS connections. +func (c *Config) EnableTLS() bool { + return c.SSL.Key != "" || c.SSL.Cert != "" || c.SSL.RootCA != "" +} + +// EnableHTTP2 when HTTP/2 extension must be enabled (only with TSL). +func (c *Config) EnableHTTP2() bool { + return c.HTTP2.Enabled +} + +// EnableH2C when HTTP/2 extension must be enabled on TCP. +func (c *Config) EnableH2C() bool { + return c.HTTP2.H2C +} + +// EnableFCGI is true when FastCGI server must be enabled. +func (c *Config) EnableFCGI() bool { + return c.FCGI.Address != "" +} + +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg Config) error { + //if c.Workers == nil { + // c.Workers = &ServerConfig{} + //} + + if c.HTTP2 == nil { + c.HTTP2 = &HTTP2Config{} + } + + if c.FCGI == nil { + c.FCGI = &FCGIConfig{} + } + + if c.Uploads == nil { + c.Uploads = &UploadsConfig{} + } + + if c.SSL.Port == 0 { + c.SSL.Port = 443 + } + + err := c.HTTP2.InitDefaults() + if err != nil { + return err + } + err = c.Uploads.InitDefaults() + if err != nil { + return err + } + //err = c.Workers.InitDefaults() + //if err != nil { + // return err + //} + // + //if err := cfg.Unmarshal(c); err != nil { + // return err + //} + // + //c.Workers.UpscaleDurations() + + if c.TrustedSubnets == nil { + // @see https://en.wikipedia.org/wiki/Reserved_IP_addresses + c.TrustedSubnets = []string{ + "10.0.0.0/8", + "127.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "::1/128", + "fc00::/7", + "fe80::/10", + } + } + + if err := c.parseCIDRs(); err != nil { + return err + } + + return c.Valid() +} + +func (c *Config) parseCIDRs() error { + for _, cidr := range c.TrustedSubnets { + _, cr, err := net.ParseCIDR(cidr) + if err != nil { + return err + } + + c.cidrs = append(c.cidrs, cr) + } + + return nil +} + +// IsTrusted if api can be trusted to use X-Real-Ip, X-Forwarded-For +func (c *Config) IsTrusted(ip string) bool { + if c.cidrs == nil { + return false + } + + i := net.ParseIP(ip) + if i == nil { + return false + } + + for _, cird := range c.cidrs { + if cird.Contains(i) { + return true + } + } + + return false +} + +// Valid validates the configuration. +func (c *Config) Valid() error { + if c.Uploads == nil { + return errors.New("malformed uploads config") + } + + if c.HTTP2 == nil { + return errors.New("malformed http2 config") + } + + //if c.Workers == nil { + // return errors.New("malformed workers config") + //} + // + //if c.Workers.Pool == nil { + // return errors.New("malformed workers config (pool config is missing)") + //} + + //if err := c.Workers.Pool.Valid(); err != nil { + // return err + //} + + if !c.EnableHTTP() && !c.EnableTLS() && !c.EnableFCGI() { + return errors.New("unable to run http service, no method has been specified (http, https, http/2 or FastCGI)") + } + + if c.Address != "" && !strings.Contains(c.Address, ":") { + return errors.New("malformed http server address") + } + + if c.EnableTLS() { + if _, err := os.Stat(c.SSL.Key); err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("key file '%s' does not exists", c.SSL.Key) + } + + return err + } + + if _, err := os.Stat(c.SSL.Cert); err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("cert file '%s' does not exists", c.SSL.Cert) + } + + return err + } + + // RootCA is optional, but if provided - check it + if c.SSL.RootCA != "" { + if _, err := os.Stat(c.SSL.RootCA); err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("root ca path provided, but path '%s' does not exists", c.SSL.RootCA) + } + return err + } + } + } + + return nil +} diff --git a/plugins/http/config_test.go b/plugins/http/config_test.go new file mode 100644 index 00000000..6d23b6ca --- /dev/null +++ b/plugins/http/config_test.go @@ -0,0 +1,330 @@ +package http + +//import ( +// "os" +// "testing" +// "time" +// +// json "github.com/json-iterator/go" +// "github.com/stretchr/testify/assert" +//) +// +//type mockCfg struct{ cfg string } +// +//func (cfg *mockCfg) Get(name string) service.Config { return nil } +//func (cfg *mockCfg) Unmarshal(out interface{}) error { +// j := json.ConfigCompatibleWithStandardLibrary +// return j.Unmarshal([]byte(cfg.cfg), out) +//} +// +//func Test_Config_Hydrate_Error1(t *testing.T) { +// cfg := &mockCfg{`{"address": "localhost:8080"}`} +// c := &Config{} +// +// assert.NoError(t, c.Hydrate(cfg)) +//} +// +//func Test_Config_Hydrate_Error2(t *testing.T) { +// cfg := &mockCfg{`{"dir": "/dir/"`} +// c := &Config{} +// +// assert.Error(t, c.Hydrate(cfg)) +//} +// +//func Test_Config_Valid(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.NoError(t, cfg.Valid()) +//} +// +//func Test_Trusted_Subnets(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// TrustedSubnets: []string{"200.1.0.0/16"}, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.NoError(t, cfg.parseCIDRs()) +// +// assert.True(t, cfg.IsTrusted("200.1.0.10")) +// assert.False(t, cfg.IsTrusted("127.0.0.0.1")) +//} +// +//func Test_Trusted_Subnets_Err(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// TrustedSubnets: []string{"200.1.0.0"}, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.parseCIDRs()) +//} +// +//func Test_Config_Valid_SSL(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// SSL: SSLConfig{ +// Cert: "fixtures/server.crt", +// Key: "fixtures/server.key", +// }, +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Hydrate(&testCfg{httpCfg: "{}"})) +// +// assert.NoError(t, cfg.Valid()) +// assert.True(t, cfg.EnableTLS()) +// assert.Equal(t, 443, cfg.SSL.Port) +//} +// +//func Test_Config_SSL_No_key(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// SSL: SSLConfig{ +// Cert: "fixtures/server.crt", +// }, +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_SSL_No_Cert(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// SSL: SSLConfig{ +// Key: "fixtures/server.key", +// }, +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoUploads(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoHTTP2(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 0, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoWorkers(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_NoPool(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 0, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_DeadPool(t *testing.T) { +// cfg := &Config{ +// Address: ":8080", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} +// +//func Test_Config_InvalidAddress(t *testing.T) { +// cfg := &Config{ +// Address: "unexpected_address", +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// HTTP2: &HTTP2Config{ +// Enabled: true, +// }, +// Workers: &roadrunner.ServerConfig{ +// Command: "php tests/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// }, +// } +// +// assert.Error(t, cfg.Valid()) +//} diff --git a/plugins/http/constants.go b/plugins/http/constants.go new file mode 100644 index 00000000..a25f52a4 --- /dev/null +++ b/plugins/http/constants.go @@ -0,0 +1,6 @@ +package http + +import "net/http" + +var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push") +var trailerHeaderKey = http.CanonicalHeaderKey("trailer") diff --git a/plugins/http/errors.go b/plugins/http/errors.go new file mode 100644 index 00000000..fb8762ef --- /dev/null +++ b/plugins/http/errors.go @@ -0,0 +1,25 @@ +// +build !windows + +package http + +import ( + "errors" + "net" + "os" + "syscall" +) + +// Broken pipe +var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer") + +// handleWriteError just check if error was caused by aborted connection on linux +func handleWriteError(err error) error { + if netErr, ok2 := err.(*net.OpError); ok2 { + if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { + if syscallErr.Err == syscall.EPIPE { + return errEPIPE + } + } + } + return err +} diff --git a/plugins/http/errors_windows.go b/plugins/http/errors_windows.go new file mode 100644 index 00000000..3d0ba04c --- /dev/null +++ b/plugins/http/errors_windows.go @@ -0,0 +1,27 @@ +// +build windows + +package http + +import ( + "errors" + "net" + "os" + "syscall" +) + +//Software caused connection abort. +//An established connection was aborted by the software in your host computer, +//possibly due to a data transmission time-out or protocol error. +var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer") + +// handleWriteError just check if error was caused by aborted connection on windows +func handleWriteError(err error) error { + if netErr, ok2 := err.(*net.OpError); ok2 { + if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { + if syscallErr.Err == syscall.WSAECONNABORTED { + return errEPIPE + } + } + } + return err +} diff --git a/plugins/http/fcgi_test.go b/plugins/http/fcgi_test.go new file mode 100644 index 00000000..82b7d1c4 --- /dev/null +++ b/plugins/http/fcgi_test.go @@ -0,0 +1,104 @@ +package http + +// +//import ( +// "io/ioutil" +// "net/http/fcgi" +// "net/http/httptest" +// "testing" +// "time" +// +// "github.com/stretchr/testify/assert" +//) +// +//func Test_FCGI_Service_Echo(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{httpCfg: `{ +// "fcgi": { +// "address": "tcp://0.0.0.0:6082" +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "pool": {"numWorkers": 1} +// } +// }`})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { assert.NoError(t, c.Serve()) }() +// time.Sleep(time.Second * 1) +// +// fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6082") +// +// fcgiHandler := gofast.NewHandler( +// gofast.BasicParamsMap(gofast.BasicSession), +// gofast.SimpleClientFactory(fcgiConnFactory, 0), +// ) +// +// w := httptest.NewRecorder() +// req := httptest.NewRequest("GET", "http://site.local/?hello=world", nil) +// fcgiHandler.ServeHTTP(w, req) +// +// body, err := ioutil.ReadAll(w.Result().Body) +// +// assert.NoError(t, err) +// assert.Equal(t, 201, w.Result().StatusCode) +// assert.Equal(t, "WORLD", string(body)) +// c.Stop() +//} +// +//func Test_FCGI_Service_Request_Uri(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{httpCfg: `{ +// "fcgi": { +// "address": "tcp://0.0.0.0:6083" +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php request-uri pipes", +// "pool": {"numWorkers": 1} +// } +// }`})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { assert.NoError(t, c.Serve()) }() +// time.Sleep(time.Second * 1) +// +// fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6083") +// +// fcgiHandler := gofast.NewHandler( +// gofast.BasicParamsMap(gofast.BasicSession), +// gofast.SimpleClientFactory(fcgiConnFactory, 0), +// ) +// +// w := httptest.NewRecorder() +// req := httptest.NewRequest("GET", "http://site.local/hello-world", nil) +// fcgiHandler.ServeHTTP(w, req) +// +// body, err := ioutil.ReadAll(w.Result().Body) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, w.Result().StatusCode) +// assert.Equal(t, "http://site.local/hello-world", string(body)) +// c.Stop() +//} diff --git a/plugins/http/fixtures/server.crt b/plugins/http/fixtures/server.crt new file mode 100644 index 00000000..24d67fd7 --- /dev/null +++ b/plugins/http/fixtures/server.crt @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICTTCCAdOgAwIBAgIJAOKyUd+llTRKMAoGCCqGSM49BAMCMGMxCzAJBgNVBAYT +AlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2Nv +MRMwEQYDVQQKDApSb2FkUnVubmVyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMTgw +OTMwMTMzNDUzWhcNMjgwOTI3MTMzNDUzWjBjMQswCQYDVQQGEwJVUzETMBEGA1UE +CAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzETMBEGA1UECgwK +Um9hZFJ1bm5lcjESMBAGA1UEAwwJbG9jYWxob3N0MHYwEAYHKoZIzj0CAQYFK4EE +ACIDYgAEVnbShsM+l5RR3wfWWmGhzuFGwNzKCk7i9xyobDIyBUxG/UUSfj7KKlUX +puDnDEtF5xXcepl744CyIAYFLOXHb5WqI4jCOzG0o9f/00QQ4bQudJOdbqV910QF +C2vb7Fxro1MwUTAdBgNVHQ4EFgQU9xUexnbB6ORKayA7Pfjzs33otsAwHwYDVR0j +BBgwFoAU9xUexnbB6ORKayA7Pfjzs33otsAwDwYDVR0TAQH/BAUwAwEB/zAKBggq +hkjOPQQDAgNoADBlAjEAue3HhR/MUhxoa9tSDBtOJT3FYbDQswrsdqBTz97CGKst +e7XeZ3HMEvEXy0hGGEMhAjAqcD/4k9vViVppgWFtkk6+NFbm+Kw/QeeAiH5FgFSj +8xQcb+b7nPwNLp3JOkXkVd4= +-----END CERTIFICATE----- diff --git a/plugins/http/fixtures/server.key b/plugins/http/fixtures/server.key new file mode 100644 index 00000000..7501dd46 --- /dev/null +++ b/plugins/http/fixtures/server.key @@ -0,0 +1,9 @@ +-----BEGIN EC PARAMETERS----- +BgUrgQQAIg== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MIGkAgEBBDCQP8utxNbHR6xZOLAJgUhn88r6IrPqmN0MsgGJM/jePB+T9UhkmIU8 +PMm2HeScbcugBwYFK4EEACKhZANiAARWdtKGwz6XlFHfB9ZaYaHO4UbA3MoKTuL3 +HKhsMjIFTEb9RRJ+PsoqVRem4OcMS0XnFdx6mXvjgLIgBgUs5cdvlaojiMI7MbSj +1//TRBDhtC50k51upX3XRAULa9vsXGs= +-----END EC PRIVATE KEY----- diff --git a/plugins/http/h2c_test.go b/plugins/http/h2c_test.go new file mode 100644 index 00000000..936ca8eb --- /dev/null +++ b/plugins/http/h2c_test.go @@ -0,0 +1,81 @@ +package http + +// +//import ( +// "net/http" +// "testing" +// "time" +// +// "github.com/cenkalti/backoff/v4" +// "github.com/stretchr/testify/assert" +//) +// +//func Test_Service_H2C(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "address": ":6029", +// "http2": {"h2c":true}, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error serving: %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 100) +// defer c.Stop() +// +// req, err := http.NewRequest("PRI", "http://localhost:6029?hello=world", nil) +// if err != nil { +// return err +// } +// +// req.Header.Add("Upgrade", "h2c") +// req.Header.Add("Connection", "HTTP2-Settings") +// req.Header.Add("HTTP2-Settings", "") +// +// r, err2 := http.DefaultClient.Do(req) +// if err2 != nil { +// return err2 +// } +// +// assert.Equal(t, "101 Switching Protocols", r.Status) +// +// err3 := r.Body.Close() +// if err3 != nil { +// return err3 +// } +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} diff --git a/plugins/http/handler.go b/plugins/http/handler.go new file mode 100644 index 00000000..5b612d7e --- /dev/null +++ b/plugins/http/handler.go @@ -0,0 +1,208 @@ +package http + +import ( + "fmt" + "net" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/log" +) + +const ( + // EventResponse thrown after the request been processed. See ErrorEvent as payload. + EventResponse = iota + 500 + + // EventError thrown on any non job error provided by road runner server. + EventError +) + +// ErrorEvent represents singular http error event. +type ErrorEvent struct { + // Request contains client request, must not be stored. + Request *http.Request + + // Error - associated error, if any. + Error error + + // event timings + start time.Time + elapsed time.Duration +} + +// Elapsed returns duration of the invocation. +func (e *ErrorEvent) Elapsed() time.Duration { + return e.elapsed +} + +// ResponseEvent represents singular http response event. +type ResponseEvent struct { + // Request contains client request, must not be stored. + Request *Request + + // Response contains service response. + Response *Response + + // event timings + start time.Time + elapsed time.Duration +} + +// Elapsed returns duration of the invocation. +func (e *ResponseEvent) Elapsed() time.Duration { + return e.elapsed +} + +// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, +// parsed files and query, payload will include parsed form dataTree (if any). +type Handler struct { + cfg *Config + log log.Logger + rr roadrunner.Pool + mul sync.Mutex + lsn func(event int, ctx interface{}) +} + +// Listen attaches handler event controller. +func (h *Handler) Listen(l func(event int, ctx interface{})) { + h.mul.Lock() + defer h.mul.Unlock() + + h.lsn = l +} + +// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + start := time.Now() + + // validating request size + if h.cfg.MaxRequestSize != 0 { + if length := r.Header.Get("content-length"); length != "" { + if size, err := strconv.ParseInt(length, 10, 64); err != nil { + h.handleError(w, r, err, start) + return + } else if size > h.cfg.MaxRequestSize*1024*1024 { + h.handleError(w, r, errors.New("request body max size is exceeded"), start) + return + } + } + } + + req, err := NewRequest(r, h.cfg.Uploads) + if err != nil { + h.handleError(w, r, err, start) + return + } + + // proxy IP resolution + h.resolveIP(req) + + req.Open(h.log) + defer req.Close(h.log) + + p, err := req.Payload() + if err != nil { + h.handleError(w, r, err, start) + return + } + + rsp, err := h.rr.Exec(p) + if err != nil { + h.handleError(w, r, err, start) + return + } + + resp, err := NewResponse(rsp) + if err != nil { + h.handleError(w, r, err, start) + return + } + + h.handleResponse(req, resp, start) + err = resp.Write(w) + if err != nil { + h.handleError(w, r, err, start) + } +} + +// handleError sends error. +func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error, start time.Time) { + // if pipe is broken, there is no sense to write the header + // in this case we just report about error + if err == errEPIPE { + h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) + return + } + // ResponseWriter is ok, write the error code + w.WriteHeader(500) + _, err2 := w.Write([]byte(err.Error())) + // error during the writing to the ResponseWriter + if err2 != nil { + // concat original error with ResponseWriter error + h.throw(EventError, &ErrorEvent{Request: r, Error: errors.New(fmt.Sprintf("error: %v, during handle this error, ResponseWriter error occurred: %v", err, err2)), start: start, elapsed: time.Since(start)}) + return + } + h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) +} + +// handleResponse triggers response event. +func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) { + h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)}) +} + +// throw invokes event handler if any. +func (h *Handler) throw(event int, ctx interface{}) { + h.mul.Lock() + defer h.mul.Unlock() + + if h.lsn != nil { + h.lsn(event, ctx) + } +} + +// get real ip passing multiple proxy +func (h *Handler) resolveIP(r *Request) { + if !h.cfg.IsTrusted(r.RemoteAddr) { + return + } + + if r.Header.Get("X-Forwarded-For") != "" { + ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",") + ipCount := len(ips) + + for i := ipCount - 1; i >= 0; i-- { + addr := strings.TrimSpace(ips[i]) + if net.ParseIP(addr) != nil { + r.RemoteAddr = addr + return + } + } + + return + } + + // The logic here is the following: + // In general case, we only expect X-Real-Ip header. If it exist, we get the IP addres from header and set request Remote address + // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers + // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF. + // CF-Connecting-IP is an Enterprise feature and we check it last in order. + // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string + if r.Header.Get("X-Real-Ip") != "" { + r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip")) + return + } + + if r.Header.Get("True-Client-IP") != "" { + r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP")) + return + } + + if r.Header.Get("CF-Connecting-IP") != "" { + r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP")) + } +} diff --git a/plugins/http/handler_test.go b/plugins/http/handler_test.go new file mode 100644 index 00000000..d15cf96f --- /dev/null +++ b/plugins/http/handler_test.go @@ -0,0 +1,1962 @@ +package http + +// +//import ( +// "bytes" +// "context" +// "github.com/spiral/roadrunner" +// "github.com/stretchr/testify/assert" +// "io/ioutil" +// "mime/multipart" +// "net/http" +// "net/http/httptest" +// "net/url" +// "os" +// "runtime" +// "strings" +// "testing" +// "time" +//) +// +//// get request and return body +//func get(url string) (string, *http.Response, error) { +// r, err := http.Get(url) +// if err != nil { +// return "", nil, err +// } +// b, err := ioutil.ReadAll(r.Body) +// if err != nil { +// return "", nil, err +// } +// +// err = r.Body.Close() +// if err != nil { +// return "", nil, err +// } +// return string(b), r, err +//} +// +//// get request and return body +//func getHeader(url string, h map[string]string) (string, *http.Response, error) { +// req, err := http.NewRequest("GET", url, bytes.NewBuffer(nil)) +// if err != nil { +// return "", nil, err +// } +// +// for k, v := range h { +// req.Header.Set(k, v) +// } +// +// r, err := http.DefaultClient.Do(req) +// if err != nil { +// return "", nil, err +// } +// +// b, err := ioutil.ReadAll(r.Body) +// if err != nil { +// return "", nil, err +// } +// +// err = r.Body.Close() +// if err != nil { +// return "", nil, err +// } +// return string(b), r, err +//} +// +//func TestHandler_Echo(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// body, r, err := get("http://localhost:8177/?hello=world") +// assert.NoError(t, err) +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", body) +//} +// +//func Test_HandlerErrors(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// wr := httptest.NewRecorder() +// rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("data"))) +// +// h.ServeHTTP(wr, rq) +// assert.Equal(t, 500, wr.Code) +//} +// +//func Test_Handler_JSON_error(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// wr := httptest.NewRecorder() +// rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("{sd"))) +// rq.Header.Add("Content-Type", "application/json") +// rq.Header.Add("Content-Size", "3") +// +// h.ServeHTTP(wr, rq) +// assert.Equal(t, 500, wr.Code) +//} +// +//func TestHandler_Headers(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php header pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8078", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 100) +// +// req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil) +// assert.NoError(t, err) +// +// req.Header.Add("input", "sample") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "world", r.Header.Get("Header")) +// assert.Equal(t, "SAMPLE", string(b)) +//} +// +//func TestHandler_Empty_User_Agent(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php user-agent pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8088", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) +// assert.NoError(t, err) +// +// req.Header.Add("user-agent", "") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "", string(b)) +//} +// +//func TestHandler_User_Agent(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php user-agent pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8088", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) +// assert.NoError(t, err) +// +// req.Header.Add("User-Agent", "go-agent") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "go-agent", string(b)) +//} +// +//func TestHandler_Cookies(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php cookie pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8079", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// req, err := http.NewRequest("GET", "http://localhost:8079", nil) +// assert.NoError(t, err) +// +// req.AddCookie(&http.Cookie{Name: "input", Value: "input-value"}) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "INPUT-VALUE", string(b)) +// +// for _, c := range r.Cookies() { +// assert.Equal(t, "output", c.Name) +// assert.Equal(t, "cookie-output", c.Value) +// } +//} +// +//func TestHandler_JsonPayload_POST(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php payload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8090", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// req, err := http.NewRequest( +// "POST", +// "http://localhost"+hs.Addr, +// bytes.NewBufferString(`{"key":"value"}`), +// ) +// assert.NoError(t, err) +// +// req.Header.Add("Content-Type", "application/json") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, `{"value":"key"}`, string(b)) +//} +// +//func TestHandler_JsonPayload_PUT(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php payload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8081", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) +// assert.NoError(t, err) +// +// req.Header.Add("Content-Type", "application/json") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, `{"value":"key"}`, string(b)) +//} +// +//func TestHandler_JsonPayload_PATCH(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php payload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8082", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) +// assert.NoError(t, err) +// +// req.Header.Add("Content-Type", "application/json") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, `{"value":"key"}`, string(b)) +//} +// +//func TestHandler_FormData_POST(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php data pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8083", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// form := url.Values{} +// +// form.Add("key", "value") +// form.Add("name[]", "name1") +// form.Add("name[]", "name2") +// form.Add("name[]", "name3") +// form.Add("arr[x][y][z]", "y") +// form.Add("arr[x][y][e]", "f") +// form.Add("arr[c]p", "l") +// form.Add("arr[c]z", "") +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) +// assert.NoError(t, err) +// +// req.Header.Add("Content-Type", "application/x-www-form-urlencoded") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +//} +// +//func TestHandler_FormData_POST_Overwrite(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php data pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8083", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// form := url.Values{} +// +// form.Add("key", "value1") +// form.Add("key", "value2") +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) +// assert.NoError(t, err) +// +// req.Header.Add("Content-Type", "application/x-www-form-urlencoded") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// assert.Equal(t, `{"key":"value2","arr":{"x":{"y":null}}}`, string(b)) +//} +// +//func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php data pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8083", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// form := url.Values{} +// +// form.Add("key", "value") +// form.Add("name[]", "name1") +// form.Add("name[]", "name2") +// form.Add("name[]", "name3") +// form.Add("arr[x][y][z]", "y") +// form.Add("arr[x][y][e]", "f") +// form.Add("arr[c]p", "l") +// form.Add("arr[c]z", "") +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) +// assert.NoError(t, err) +// +// req.Header.Add("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +//} +// +//func TestHandler_FormData_PUT(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php data pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8084", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// form := url.Values{} +// +// form.Add("key", "value") +// form.Add("name[]", "name1") +// form.Add("name[]", "name2") +// form.Add("name[]", "name3") +// form.Add("arr[x][y][z]", "y") +// form.Add("arr[x][y][e]", "f") +// form.Add("arr[c]p", "l") +// form.Add("arr[c]z", "") +// +// req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) +// assert.NoError(t, err) +// +// req.Header.Add("Content-Type", "application/x-www-form-urlencoded") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +//} +// +//func TestHandler_FormData_PATCH(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php data pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8085", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// form := url.Values{} +// +// form.Add("key", "value") +// form.Add("name[]", "name1") +// form.Add("name[]", "name2") +// form.Add("name[]", "name3") +// form.Add("arr[x][y][z]", "y") +// form.Add("arr[x][y][e]", "f") +// form.Add("arr[c]p", "l") +// form.Add("arr[c]z", "") +// +// req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) +// assert.NoError(t, err) +// +// req.Header.Add("Content-Type", "application/x-www-form-urlencoded") +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +//} +// +//func TestHandler_Multipart_POST(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php data pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8019", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// err := w.WriteField("key", "value") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("key", "value") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("name[]", "name1") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("name[]", "name2") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("name[]", "name3") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[x][y][z]", "y") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[x][y][e]", "f") +// +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[c]p", "l") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[c]z", "") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the writer: error %v", err) +// } +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +//} +// +//func TestHandler_Multipart_PUT(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php data pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8020", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// err := w.WriteField("key", "value") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("key", "value") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("name[]", "name1") +// +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("name[]", "name2") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("name[]", "name3") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[x][y][z]", "y") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[x][y][e]", "f") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[c]p", "l") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[c]z", "") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the writer: error %v", err) +// } +// +// req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +//} +// +//func TestHandler_Multipart_PATCH(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php data pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8021", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// err := w.WriteField("key", "value") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("key", "value") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("name[]", "name1") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("name[]", "name2") +// +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("name[]", "name3") +// +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[x][y][z]", "y") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[x][y][e]", "f") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[c]p", "l") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.WriteField("arr[c]z", "") +// if err != nil { +// t.Errorf("error writing the field: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the writer: error %v", err) +// } +// +// req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +//} +// +//func TestHandler_Error(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php error pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// _, r, err := get("http://localhost:8177/?hello=world") +// assert.NoError(t, err) +// assert.Equal(t, 500, r.StatusCode) +//} +// +//func TestHandler_Error2(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php error2 pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// _, r, err := get("http://localhost:8177/?hello=world") +// assert.NoError(t, err) +// assert.Equal(t, 500, r.StatusCode) +//} +// +//func TestHandler_Error3(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php pid pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// b2 := &bytes.Buffer{} +// for i := 0; i < 1024*1024; i++ { +// b2.Write([]byte(" ")) +// } +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, b2) +// assert.NoError(t, err) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error during the closing Body: error %v", err) +// +// } +// }() +// +// assert.NoError(t, err) +// assert.Equal(t, 500, r.StatusCode) +//} +// +//func TestHandler_ResponseDuration(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// gotresp := make(chan interface{}) +// h.Listen(func(event int, ctx interface{}) { +// if event == EventResponse { +// c := ctx.(*ResponseEvent) +// +// if c.Elapsed() > 0 { +// close(gotresp) +// } +// } +// }) +// +// body, r, err := get("http://localhost:8177/?hello=world") +// assert.NoError(t, err) +// +// <-gotresp +// +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", body) +//} +// +//func TestHandler_ResponseDurationDelayed(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php echoDelay pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// gotresp := make(chan interface{}) +// h.Listen(func(event int, ctx interface{}) { +// if event == EventResponse { +// c := ctx.(*ResponseEvent) +// +// if c.Elapsed() > time.Second { +// close(gotresp) +// } +// } +// }) +// +// body, r, err := get("http://localhost:8177/?hello=world") +// assert.NoError(t, err) +// +// <-gotresp +// +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", body) +//} +// +//func TestHandler_ErrorDuration(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php error pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// goterr := make(chan interface{}) +// h.Listen(func(event int, ctx interface{}) { +// if event == EventError { +// c := ctx.(*ErrorEvent) +// +// if c.Elapsed() > 0 { +// close(goterr) +// } +// } +// }) +// +// _, r, err := get("http://localhost:8177/?hello=world") +// assert.NoError(t, err) +// +// <-goterr +// +// assert.Equal(t, 500, r.StatusCode) +//} +// +//func TestHandler_IP(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// TrustedSubnets: []string{ +// "10.0.0.0/8", +// "127.0.0.0/8", +// "172.16.0.0/12", +// "192.168.0.0/16", +// "::1/128", +// "fc00::/7", +// "fe80::/10", +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php ip pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// err := h.cfg.parseCIDRs() +// if err != nil { +// t.Errorf("error parsing CIDRs: error %v", err) +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// body, r, err := get("http://127.0.0.1:8177/") +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "127.0.0.1", body) +//} +// +//func TestHandler_XRealIP(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// TrustedSubnets: []string{ +// "10.0.0.0/8", +// "127.0.0.0/8", +// "172.16.0.0/12", +// "192.168.0.0/16", +// "::1/128", +// "fc00::/7", +// "fe80::/10", +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php ip pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// err := h.cfg.parseCIDRs() +// if err != nil { +// t.Errorf("error parsing CIDRs: error %v", err) +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{ +// "X-Real-Ip": "200.0.0.1", +// }) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "200.0.0.1", body) +//} +// +//func TestHandler_XForwardedFor(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// TrustedSubnets: []string{ +// "10.0.0.0/8", +// "127.0.0.0/8", +// "172.16.0.0/12", +// "192.168.0.0/16", +// "100.0.0.0/16", +// "200.0.0.0/16", +// "::1/128", +// "fc00::/7", +// "fe80::/10", +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php ip pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// err := h.cfg.parseCIDRs() +// if err != nil { +// t.Errorf("error parsing CIDRs: error %v", err) +// } +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{ +// "X-Forwarded-For": "100.0.0.1, 200.0.0.1, invalid, 101.0.0.1", +// }) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "101.0.0.1", body) +// +// body, r, err = getHeader("http://127.0.0.1:8177/", map[string]string{ +// "X-Forwarded-For": "100.0.0.1, 200.0.0.1, 101.0.0.1, invalid", +// }) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "101.0.0.1", body) +//} +// +//func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// TrustedSubnets: []string{ +// "10.0.0.0/8", +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php ip pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// err := h.cfg.parseCIDRs() +// if err != nil { +// t.Errorf("error parsing CIDRs: error %v", err) +// } +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{ +// "X-Forwarded-For": "100.0.0.1, 200.0.0.1, invalid, 101.0.0.1", +// }) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "127.0.0.1", body) +//} +// +//func BenchmarkHandler_Listen_Echo(b *testing.B) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php echo pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: int64(runtime.NumCPU()), +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// err := h.pool.Start() +// if err != nil { +// b.Errorf("error starting the worker pool: error %v", err) +// } +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8177", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// b.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// b.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// bb := "WORLD" +// for n := 0; n < b.N; n++ { +// r, err := http.Get("http://localhost:8177/?hello=world") +// if err != nil { +// b.Fail() +// } +// // Response might be nil here +// if r != nil { +// br, err := ioutil.ReadAll(r.Body) +// if err != nil { +// b.Errorf("error reading Body: error %v", err) +// } +// if string(br) != bb { +// b.Fail() +// } +// err = r.Body.Close() +// if err != nil { +// b.Errorf("error closing the Body: error %v", err) +// } +// } else { +// b.Errorf("got nil response") +// } +// } +//} diff --git a/plugins/http/parse.go b/plugins/http/parse.go new file mode 100644 index 00000000..9b58d328 --- /dev/null +++ b/plugins/http/parse.go @@ -0,0 +1,147 @@ +package http + +import ( + "net/http" +) + +// MaxLevel defines maximum tree depth for incoming request data and files. +const MaxLevel = 127 + +type dataTree map[string]interface{} +type fileTree map[string]interface{} + +// parseData parses incoming request body into data tree. +func parseData(r *http.Request) dataTree { + data := make(dataTree) + if r.PostForm != nil { + for k, v := range r.PostForm { + data.push(k, v) + } + } + + if r.MultipartForm != nil { + for k, v := range r.MultipartForm.Value { + data.push(k, v) + } + } + + return data +} + +// pushes value into data tree. +func (d dataTree) push(k string, v []string) { + keys := fetchIndexes(k) + if len(keys) <= MaxLevel { + d.mount(keys, v) + } +} + +// mount mounts data tree recursively. +func (d dataTree) mount(i []string, v []string) { + if len(i) == 1 { + // single value context (last element) + d[i[0]] = v[len(v)-1] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(dataTree).mount(i[1:], v) + return + } + + d[i[0]] = make(dataTree) + d[i[0]].(dataTree).mount(i[1:], v) +} + +// parse incoming dataTree request into JSON (including contentMultipart form dataTree) +func parseUploads(r *http.Request, cfg *UploadsConfig) *Uploads { + u := &Uploads{ + cfg: cfg, + tree: make(fileTree), + list: make([]*FileUpload, 0), + } + + for k, v := range r.MultipartForm.File { + files := make([]*FileUpload, 0, len(v)) + for _, f := range v { + files = append(files, NewUpload(f)) + } + + u.list = append(u.list, files...) + u.tree.push(k, files) + } + + return u +} + +// pushes new file upload into it's proper place. +func (d fileTree) push(k string, v []*FileUpload) { + keys := fetchIndexes(k) + if len(keys) <= MaxLevel { + d.mount(keys, v) + } +} + +// mount mounts data tree recursively. +func (d fileTree) mount(i []string, v []*FileUpload) { + if len(i) == 1 { + // single value context + d[i[0]] = v[0] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(fileTree).mount(i[1:], v) + return + } + + d[i[0]] = make(fileTree) + d[i[0]].(fileTree).mount(i[1:], v) +} + +// fetchIndexes parses input name and splits it into separate indexes list. +func fetchIndexes(s string) []string { + var ( + pos int + ch string + keys = make([]string, 1) + ) + + for _, c := range s { + ch = string(c) + switch ch { + case " ": + // ignore all spaces + continue + case "[": + pos = 1 + continue + case "]": + if pos == 1 { + keys = append(keys, "") + } + pos = 2 + default: + if pos == 1 || pos == 2 { + keys = append(keys, "") + } + + keys[len(keys)-1] += ch + pos = 0 + } + } + + return keys +} diff --git a/plugins/http/parse_test.go b/plugins/http/parse_test.go new file mode 100644 index 00000000..f95a3f9d --- /dev/null +++ b/plugins/http/parse_test.go @@ -0,0 +1,52 @@ +package http + +import "testing" + +var samples = []struct { + in string + out []string +}{ + {"key", []string{"key"}}, + {"key[subkey]", []string{"key", "subkey"}}, + {"key[subkey]value", []string{"key", "subkey", "value"}}, + {"key[subkey][value]", []string{"key", "subkey", "value"}}, + {"key[subkey][value][]", []string{"key", "subkey", "value", ""}}, + {"key[subkey] [value][]", []string{"key", "subkey", "value", ""}}, + {"key [ subkey ] [ value ] [ ]", []string{"key", "subkey", "value", ""}}, +} + +func Test_FetchIndexes(t *testing.T) { + for _, tt := range samples { + t.Run(tt.in, func(t *testing.T) { + r := fetchIndexes(tt.in) + if !same(r, tt.out) { + t.Errorf("got %q, want %q", r, tt.out) + } + }) + } +} + +func BenchmarkConfig_FetchIndexes(b *testing.B) { + for _, tt := range samples { + for n := 0; n < b.N; n++ { + r := fetchIndexes(tt.in) + if !same(r, tt.out) { + b.Fail() + } + } + } +} + +func same(in, out []string) bool { + if len(in) != len(out) { + return false + } + + for i, v := range in { + if v != out[i] { + return false + } + } + + return true +} diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go new file mode 100644 index 00000000..581455b3 --- /dev/null +++ b/plugins/http/plugin.go @@ -0,0 +1,470 @@ +package http + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "net/http" + "net/http/fcgi" + "net/url" + "strings" + "sync" + + "github.com/hashicorp/go-multierror" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/log" + factory "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/util" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" + "golang.org/x/sys/cpu" +) + +const ( + // ID contains default service name. + ServiceName = "http" + + // EventInitSSL thrown at moment of https initialization. SSL server passed as context. + EventInitSSL = 750 +) + +//var couldNotAppendPemError = errors.New("could not append Certs from PEM") + +// http middleware type. +type middleware func(f http.HandlerFunc) http.HandlerFunc + +// Service manages pool, http servers. +type Plugin struct { + sync.Mutex + sync.WaitGroup + + cfg *Config + configurer config.Configurer + log log.Logger + + mdwr []middleware + listeners []util.EventListener + + pool roadrunner.Pool + server factory.Server + //controller roadrunner.Controller + handler *Handler + + http *http.Server + https *http.Server + fcgi *http.Server +} + +// AddMiddleware adds new net/http mdwr. +func (s *Plugin) AddMiddleware(m middleware) { + s.mdwr = append(s.mdwr, m) +} + +// AddListener attaches server event controller. +func (s *Plugin) AddListener(listener util.EventListener) { + // save listeners for Reset + s.listeners = append(s.listeners, listener) + s.pool.AddListener(listener) +} + +// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of +// misconfiguration. Services must not be used without proper configuration pushed first. +func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Server) error { + const op = errors.Op("http Init") + err := cfg.UnmarshalKey(ServiceName, &s.cfg) + if err != nil { + return errors.E(op, err) + } + + s.configurer = cfg + s.listeners = make([]util.EventListener, 0, 1) + s.log = log + + // Set needed env vars + env := make(map[string]string) + env["RR_HTTP"] = "true" + + p, err := server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + Debug: s.cfg.Pool.Debug, + NumWorkers: s.cfg.Pool.NumWorkers, + MaxJobs: s.cfg.Pool.MaxJobs, + AllocateTimeout: s.cfg.Pool.AllocateTimeout, + DestroyTimeout: s.cfg.Pool.DestroyTimeout, + Supervisor: nil, + }, env) + + if err != nil { + return errors.E(op, err) + } + + s.pool = p + + //if r != nil { + // if err := r.Register(ID, &rpcServer{s}); err != nil { + // return false, err + // } + //} + // + //if !cfg.EnableHTTP() && !cfg.EnableTLS() && !cfg.EnableFCGI() { + // return false, nil + //} + + return nil +} + +// Serve serves the svc. +func (s *Plugin) Serve() chan error { + s.Lock() + defer s.Unlock() + + const op = errors.Op("serve http") + errCh := make(chan error, 2) + + //if s.env != nil { + // if err := s.env.Copy(s.cfg.Workers); err != nil { + // return nil + // } + //} + // + //s.cfg.Workers.CommandProducer = s.cprod + //s.cfg.Workers.SetEnv("RR_HTTP", "true") + // + //s.pool = roadrunner.NewServer(s.cfg.Workers) + //s.pool.Listen(s.throw) + // + //if s.controller != nil { + // s.pool.Attach(s.controller) + //} + + s.handler = &Handler{cfg: s.cfg, rr: s.pool} + //s.handler.Listen(s.throw) + + if s.cfg.EnableHTTP() { + if s.cfg.EnableH2C() { + s.http = &http.Server{Addr: s.cfg.Address, Handler: h2c.NewHandler(s, &http2.Server{})} + } else { + s.http = &http.Server{Addr: s.cfg.Address, Handler: s} + } + } + + if s.cfg.EnableTLS() { + s.https = s.initSSL() + if s.cfg.SSL.RootCA != "" { + err := s.appendRootCa() + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + } + + if s.cfg.EnableHTTP2() { + if err := s.initHTTP2(); err != nil { + errCh <- errors.E(op, err) + return errCh + } + } + } + + if s.cfg.EnableFCGI() { + s.fcgi = &http.Server{Handler: s} + } + + //if err := s.pool.Start(); err != nil { + // return err + //} + //defer s.pool.Stop() + + if s.http != nil { + go func() { + httpErr := s.http.ListenAndServe() + if httpErr != nil && httpErr != http.ErrServerClosed { + errCh <- errors.E(op, httpErr) + return + } + return + }() + } + + if s.https != nil { + go func() { + httpErr := s.https.ListenAndServeTLS( + s.cfg.SSL.Cert, + s.cfg.SSL.Key, + ) + + if httpErr != nil && httpErr != http.ErrServerClosed { + errCh <- errors.E(op, httpErr) + return + } + return + }() + } + + if s.fcgi != nil { + go func() { + httpErr := s.serveFCGI() + if httpErr != nil && httpErr != http.ErrServerClosed { + errCh <- errors.E(op, httpErr) + return + } + return + }() + } + + return errCh +} + +// Stop stops the http. +func (s *Plugin) Stop() error { + s.Lock() + defer s.Unlock() + + var err error + if s.fcgi != nil { + err = s.fcgi.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the fcgi server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } + } + + if s.https != nil { + err = s.https.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the https server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } + } + + if s.http != nil { + err = s.http.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the http server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } + } + + return err +} + +// ServeHTTP handles connection using set of middleware and pool PSR-7 server. +func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect { + target := &url.URL{ + Scheme: "https", + Host: s.tlsAddr(r.Host, false), + Path: r.URL.Path, + RawQuery: r.URL.RawQuery, + } + + http.Redirect(w, r, target.String(), http.StatusTemporaryRedirect) + return + } + + if s.https != nil && r.TLS != nil { + w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload") + } + + //r = attributes.Init(r) + + // chaining middleware + f := s.handler.ServeHTTP + for _, m := range s.mdwr { + f = m(f) + } + f(w, r) +} + +// append RootCA to the https server TLS config +func (s *Plugin) appendRootCa() error { + const op = errors.Op("append root CA") + rootCAs, err := x509.SystemCertPool() + if err != nil { + //s.throw(EventInitSSL, nil) + return nil + } + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + + CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA) + if err != nil { + //s.throw(EventInitSSL, nil) + return err + } + + // should append our CA cert + ok := rootCAs.AppendCertsFromPEM(CA) + if !ok { + return errors.E(op, errors.Str("could not append Certs from PEM")) + } + cfg := &tls.Config{ + InsecureSkipVerify: false, + RootCAs: rootCAs, + } + s.http.TLSConfig = cfg + + return nil +} + +// Init https server +func (s *Plugin) initSSL() *http.Server { + var topCipherSuites []uint16 + var defaultCipherSuitesTLS13 []uint16 + + hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ + hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL + // Keep in sync with crypto/aes/cipher_s390x.go. + hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM) + + hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X + + if hasGCMAsm { + // If AES-GCM hardware is provided then prioritise AES-GCM + // cipher suites. + topCipherSuites = []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + } + defaultCipherSuitesTLS13 = []uint16{ + tls.TLS_AES_128_GCM_SHA256, + tls.TLS_CHACHA20_POLY1305_SHA256, + tls.TLS_AES_256_GCM_SHA384, + } + } else { + // Without AES-GCM hardware, we put the ChaCha20-Poly1305 + // cipher suites first. + topCipherSuites = []uint16{ + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + } + defaultCipherSuitesTLS13 = []uint16{ + tls.TLS_CHACHA20_POLY1305_SHA256, + tls.TLS_AES_128_GCM_SHA256, + tls.TLS_AES_256_GCM_SHA384, + } + } + + DefaultCipherSuites := make([]uint16, 0, 22) + DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...) + DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) + + server := &http.Server{ + Addr: s.tlsAddr(s.cfg.Address, true), + Handler: s, + TLSConfig: &tls.Config{ + CurvePreferences: []tls.CurveID{ + tls.CurveP256, + tls.CurveP384, + tls.CurveP521, + tls.X25519, + }, + CipherSuites: DefaultCipherSuites, + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + }, + } + + return server +} + +// init http/2 server +func (s *Plugin) initHTTP2() error { + return http2.ConfigureServer(s.https, &http2.Server{ + MaxConcurrentStreams: s.cfg.HTTP2.MaxConcurrentStreams, + }) +} + +// serveFCGI starts FastCGI server. +func (s *Plugin) serveFCGI() error { + l, err := util.CreateListener(s.cfg.FCGI.Address) + if err != nil { + return err + } + + err = fcgi.Serve(l, s.fcgi.Handler) + if err != nil { + return err + } + + return nil +} + +// throw handles service, server and pool events. +//func (s *Plugin) throw(event int, ctx interface{}) { +// for _, l := range s.lsns { +// l(event, ctx) +// } +// +// if event == roadrunner.EventServerFailure { +// // underlying pool server is dead +// s.Stop() +// } +//} + +// tlsAddr replaces listen or host port with port configured by SSL config. +func (s *Plugin) tlsAddr(host string, forcePort bool) string { + // remove current forcePort first + host = strings.Split(host, ":")[0] + + if forcePort || s.cfg.SSL.Port != 443 { + host = fmt.Sprintf("%s:%v", host, s.cfg.SSL.Port) + } + + return host +} + +// Server returns associated pool workers +func (s *Plugin) Workers() []roadrunner.WorkerBase { + return s.pool.Workers() +} + +func (s *Plugin) Reset() error { + s.Lock() + defer s.Unlock() + s.pool.Destroy(context.Background()) + + // Set needed env vars + env := make(map[string]string) + env["RR_HTTP"] = "true" + var err error + + // re-read the config + err = s.configurer.UnmarshalKey(ServiceName, &s.cfg) + if err != nil { + return err + } + + s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + Debug: false, + NumWorkers: 0, + MaxJobs: 0, + AllocateTimeout: 0, + DestroyTimeout: 0, + Supervisor: nil, + }, env) + if err != nil { + return err + } + + // restore original listeners + for i := 0; i < len(s.listeners); i++ { + s.pool.AddListener(s.listeners[i]) + } + return nil +} diff --git a/plugins/http/plugin_test.go b/plugins/http/plugin_test.go new file mode 100644 index 00000000..012cea04 --- /dev/null +++ b/plugins/http/plugin_test.go @@ -0,0 +1,759 @@ +package http + +//import ( +// "github.com/cenkalti/backoff/v4" +// json "github.com/json-iterator/go" +// "github.com/sirupsen/logrus" +// "github.com/sirupsen/logrus/hooks/test" +// "github.com/spiral/roadrunner" +// "github.com/spiral/roadrunner/service" +// "github.com/spiral/roadrunner/service/env" +// "github.com/spiral/roadrunner/service/rpc" +// "github.com/stretchr/testify/assert" +// "io/ioutil" +// "net/http" +// "os" +// "testing" +// "time" +//) +// +//type testCfg struct { +// httpCfg string +// rpcCfg string +// envCfg string +// target string +//} +// +//func (cfg *testCfg) Get(name string) service.Config { +// if name == ID { +// if cfg.httpCfg == "" { +// return nil +// } +// +// return &testCfg{target: cfg.httpCfg} +// } +// +// if name == rpc.ID { +// return &testCfg{target: cfg.rpcCfg} +// } +// +// if name == env.ID { +// return &testCfg{target: cfg.envCfg} +// } +// +// return nil +//} +//func (cfg *testCfg) Unmarshal(out interface{}) error { +// j := json.ConfigCompatibleWithStandardLibrary +// return j.Unmarshal([]byte(cfg.target), out) +//} +// +//func Test_Service_NoConfig(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{"Enable":true}`}) +// assert.Error(t, err) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusInactive, st) +//} +// +//func Test_Service_Configure_Disable(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusInactive, st) +//} +// +//func Test_Service_Configure_Enable(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":8070", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +// +//} +// +//func Test_Service_Echo(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6536", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("serve error: %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 100) +// +// req, err := http.NewRequest("GET", "http://localhost:6536?hello=world", nil) +// if err != nil { +// c.Stop() +// return err +// } +// +// r, err := http.DefaultClient.Do(req) +// if err != nil { +// c.Stop() +// return err +// } +// b, err := ioutil.ReadAll(r.Body) +// if err != nil { +// c.Stop() +// return err +// } +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// err = r.Body.Close() +// if err != nil { +// c.Stop() +// return err +// } +// +// c.Stop() +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func Test_Service_Env(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(env.ID, env.NewService(map[string]string{"pool": "test"})) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":10031", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php env pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`, envCfg: `{"env_key":"ENV_VALUE"}`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("serve error: %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "http://localhost:10031", nil) +// if err != nil { +// c.Stop() +// return err +// } +// +// r, err := http.DefaultClient.Do(req) +// if err != nil { +// c.Stop() +// return err +// } +// +// b, err := ioutil.ReadAll(r.Body) +// if err != nil { +// c.Stop() +// return err +// } +// +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "ENV_VALUE", string(b)) +// +// err = r.Body.Close() +// if err != nil { +// c.Stop() +// return err +// } +// +// c.Stop() +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +// +//} +// +//func Test_Service_ErrorEcho(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6030", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echoerr pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// goterr := make(chan interface{}) +// s.(*Service).AddListener(func(event int, ctx interface{}) { +// if event == roadrunner.EventStderrOutput { +// if string(ctx.([]byte)) == "WORLD\n" { +// goterr <- nil +// } +// } +// }) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("serve error: %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil) +// if err != nil { +// c.Stop() +// return err +// } +// +// r, err := http.DefaultClient.Do(req) +// if err != nil { +// c.Stop() +// return err +// } +// +// b, err := ioutil.ReadAll(r.Body) +// if err != nil { +// c.Stop() +// return err +// } +// +// <-goterr +// +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// err = r.Body.Close() +// if err != nil { +// c.Stop() +// return err +// } +// +// c.Stop() +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func Test_Service_Middleware(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6032", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc { +// return func(w http.ResponseWriter, r *http.Request) { +// if r.URL.Path == "/halt" { +// w.WriteHeader(500) +// _, err := w.Write([]byte("halted")) +// if err != nil { +// t.Errorf("error writing the data to the http reply: error %v", err) +// } +// } else { +// f(w, r) +// } +// } +// }) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("serve error: %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil) +// if err != nil { +// c.Stop() +// return err +// } +// +// r, err := http.DefaultClient.Do(req) +// if err != nil { +// c.Stop() +// return err +// } +// +// b, err := ioutil.ReadAll(r.Body) +// if err != nil { +// c.Stop() +// return err +// } +// +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// err = r.Body.Close() +// if err != nil { +// c.Stop() +// return err +// } +// +// req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil) +// if err != nil { +// c.Stop() +// return err +// } +// +// r, err = http.DefaultClient.Do(req) +// if err != nil { +// c.Stop() +// return err +// } +// b, err = ioutil.ReadAll(r.Body) +// if err != nil { +// c.Stop() +// return err +// } +// +// assert.Equal(t, 500, r.StatusCode) +// assert.Equal(t, "halted", string(b)) +// +// err = r.Body.Close() +// if err != nil { +// c.Stop() +// return err +// } +// c.Stop() +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +// +//} +// +//func Test_Service_Listener(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6033", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// stop := make(chan interface{}) +// s.(*Service).AddListener(func(event int, ctx interface{}) { +// if event == roadrunner.EventServerStart { +// stop <- nil +// } +// }) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("serve error: %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// c.Stop() +// assert.True(t, true) +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func Test_Service_Error(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6034", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "---", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// // assert error +// err = c.Serve() +// if err == nil { +// return err +// } +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func Test_Service_Error2(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6035", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php broken pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// // assert error +// err = c.Serve() +// if err == nil { +// return err +// } +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func Test_Service_Error3(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6036", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers" +// "command": "php ../../tests/http/client.php broken pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// // assert error +// if err == nil { +// return err +// } +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +// +//} +// +//func Test_Service_Error4(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": "----", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php broken pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// // assert error +// if err != nil { +// return nil +// } +// +// return err +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func tmpDir() string { +// p := os.TempDir() +// j := json.ConfigCompatibleWithStandardLibrary +// r, _ := j.Marshal(p) +// +// return string(r) +//} diff --git a/plugins/http/request.go b/plugins/http/request.go new file mode 100644 index 00000000..7e9839b2 --- /dev/null +++ b/plugins/http/request.go @@ -0,0 +1,183 @@ +package http + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "strings" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/log" +) + +const ( + defaultMaxMemory = 32 << 20 // 32 MB + contentNone = iota + 900 + contentStream + contentMultipart + contentFormData +) + +// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. +type Request struct { + // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address. + RemoteAddr string `json:"remoteAddr"` + + // Protocol includes HTTP protocol version. + Protocol string `json:"protocol"` + + // Method contains name of HTTP method used for the request. + Method string `json:"method"` + + // URI contains full request URI with scheme and query. + URI string `json:"uri"` + + // Header contains list of request headers. + Header http.Header `json:"headers"` + + // Cookies contains list of request cookies. + Cookies map[string]string `json:"cookies"` + + // RawQuery contains non parsed query string (to be parsed on php end). + RawQuery string `json:"rawQuery"` + + // Parsed indicates that request body has been parsed on RR end. + Parsed bool `json:"parsed"` + + // Uploads contains list of uploaded files, their names, sized and associations with temporary files. + Uploads *Uploads `json:"uploads"` + + // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions. + Attributes map[string]interface{} `json:"attributes"` + + // request body can be parsedData or []byte + body interface{} +} + +func fetchIP(pair string) string { + if !strings.ContainsRune(pair, ':') { + return pair + } + + addr, _, _ := net.SplitHostPort(pair) + return addr +} + +// NewRequest creates new PSR7 compatible request using net/http request. +func NewRequest(r *http.Request, cfg *UploadsConfig) (*Request, error) { + req := &Request{ + RemoteAddr: fetchIP(r.RemoteAddr), + Protocol: r.Proto, + Method: r.Method, + URI: uri(r), + Header: r.Header, + Cookies: make(map[string]string), + RawQuery: r.URL.RawQuery, + //Attributes: attributes.All(r), + } + + for _, c := range r.Cookies() { + if v, err := url.QueryUnescape(c.Value); err == nil { + req.Cookies[c.Name] = v + } + } + + switch req.contentType() { + case contentNone: + return req, nil + + case contentStream: + var err error + req.body, err = ioutil.ReadAll(r.Body) + return req, err + + case contentMultipart: + if err := r.ParseMultipartForm(defaultMaxMemory); err != nil { + return nil, err + } + + req.Uploads = parseUploads(r, cfg) + fallthrough + case contentFormData: + if err := r.ParseForm(); err != nil { + return nil, err + } + + req.body = parseData(r) + } + + req.Parsed = true + return req, nil +} + +// Open moves all uploaded files to temporary directory so it can be given to php later. +func (r *Request) Open(log log.Logger) { + if r.Uploads == nil { + return + } + + r.Uploads.Open(log) +} + +// Close clears all temp file uploads +func (r *Request) Close(log log.Logger) { + if r.Uploads == nil { + return + } + + r.Uploads.Clear(log) +} + +// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open +// files prior to calling this method. +func (r *Request) Payload() (roadrunner.Payload, error) { + p := roadrunner.Payload{} + + var err error + if p.Context, err = json.Marshal(r); err != nil { + return roadrunner.EmptyPayload, err + } + + if r.Parsed { + if p.Body, err = json.Marshal(r.body); err != nil { + return roadrunner.EmptyPayload, err + } + } else if r.body != nil { + p.Body = r.body.([]byte) + } + + return p, nil +} + +// contentType returns the payload content type. +func (r *Request) contentType() int { + if r.Method == "HEAD" || r.Method == "OPTIONS" { + return contentNone + } + + ct := r.Header.Get("content-type") + if strings.Contains(ct, "application/x-www-form-urlencoded") { + return contentFormData + } + + if strings.Contains(ct, "multipart/form-data") { + return contentMultipart + } + + return contentStream +} + +// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func uri(r *http.Request) string { + if r.URL.Host != "" { + return r.URL.String() + } + if r.TLS != nil { + return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) + } + + return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) +} diff --git a/plugins/http/response.go b/plugins/http/response.go new file mode 100644 index 00000000..9ebc0632 --- /dev/null +++ b/plugins/http/response.go @@ -0,0 +1,105 @@ +package http + +import ( + "io" + "net/http" + "strings" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2" +) + +// Response handles PSR7 response logic. +type Response struct { + // Status contains response status. + Status int `json:"status"` + + // Header contains list of response headers. + Headers map[string][]string `json:"headers"` + + // associated body payload. + body interface{} +} + +// NewResponse creates new response based on given pool payload. +func NewResponse(p roadrunner.Payload) (*Response, error) { + r := &Response{body: p.Body} + j := json.ConfigCompatibleWithStandardLibrary + if err := j.Unmarshal(p.Context, r); err != nil { + return nil, err + } + + return r, nil +} + +// Write writes response headers, status and body into ResponseWriter. +func (r *Response) Write(w http.ResponseWriter) error { + // INFO map is the reference type in golang + p := handlePushHeaders(r.Headers) + if pusher, ok := w.(http.Pusher); ok { + for _, v := range p { + err := pusher.Push(v, nil) + if err != nil { + return err + } + } + } + + handleTrailers(r.Headers) + for n, h := range r.Headers { + for _, v := range h { + w.Header().Add(n, v) + } + } + + w.WriteHeader(r.Status) + + if data, ok := r.body.([]byte); ok { + _, err := w.Write(data) + if err != nil { + return handleWriteError(err) + } + } + + if rc, ok := r.body.(io.Reader); ok { + if _, err := io.Copy(w, rc); err != nil { + return err + } + } + + return nil +} + +func handlePushHeaders(h map[string][]string) []string { + var p []string + pushHeader, ok := h[http2pushHeaderKey] + if !ok { + return p + } + + p = append(p, pushHeader...) + + delete(h, http2pushHeaderKey) + + return p +} + +func handleTrailers(h map[string][]string) { + trailers, ok := h[trailerHeaderKey] + if !ok { + return + } + + for _, tr := range trailers { + for _, n := range strings.Split(tr, ",") { + n = strings.Trim(n, "\t ") + if v, ok := h[n]; ok { + h["Trailer:"+n] = v + + delete(h, n) + } + } + } + + delete(h, trailerHeaderKey) +} diff --git a/plugins/http/response_test.go b/plugins/http/response_test.go new file mode 100644 index 00000000..b5adbad9 --- /dev/null +++ b/plugins/http/response_test.go @@ -0,0 +1,162 @@ +package http + +import ( + "bytes" + "errors" + "net/http" + "testing" + + "github.com/spiral/roadrunner/v2" + "github.com/stretchr/testify/assert" +) + +type testWriter struct { + h http.Header + buf bytes.Buffer + wroteHeader bool + code int + err error + pushErr error + pushes []string +} + +func (tw *testWriter) Header() http.Header { return tw.h } + +func (tw *testWriter) Write(p []byte) (int, error) { + if !tw.wroteHeader { + tw.WriteHeader(http.StatusOK) + } + + n, e := tw.buf.Write(p) + if e == nil { + e = tw.err + } + + return n, e +} + +func (tw *testWriter) WriteHeader(code int) { tw.wroteHeader = true; tw.code = code } + +func (tw *testWriter) Push(target string, opts *http.PushOptions) error { + tw.pushes = append(tw.pushes, target) + + return tw.pushErr +} + +func TestNewResponse_Error(t *testing.T) { + r, err := NewResponse(roadrunner.Payload{Context: []byte(`invalid payload`)}) + assert.Error(t, err) + assert.Nil(t, r) +} + +func TestNewResponse_Write(t *testing.T) { + r, err := NewResponse(roadrunner.Payload{ + Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), + Body: []byte(`sample body`), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Equal(t, 301, w.code) + assert.Equal(t, "value", w.h.Get("key")) + assert.Equal(t, "sample body", w.buf.String()) +} + +func TestNewResponse_Stream(t *testing.T) { + r, err := NewResponse(roadrunner.Payload{ + Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), + }) + + // r is pointer, so, it might be nil + if r == nil { + t.Fatal("response is nil") + } + + r.body = &bytes.Buffer{} + r.body.(*bytes.Buffer).WriteString("hello world") + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Equal(t, 301, w.code) + assert.Equal(t, "value", w.h.Get("key")) + assert.Equal(t, "hello world", w.buf.String()) +} + +func TestNewResponse_StreamError(t *testing.T) { + r, err := NewResponse(roadrunner.Payload{ + Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), + }) + + // r is pointer, so, it might be nil + if r == nil { + t.Fatal("response is nil") + } + + r.body = &bytes.Buffer{} + r.body.(*bytes.Buffer).WriteString("hello world") + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string)), err: errors.New("error")} + assert.Error(t, r.Write(w)) +} + +func TestWrite_HandlesPush(t *testing.T) { + r, err := NewResponse(roadrunner.Payload{ + Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Nil(t, w.h["Http2-Push"]) + assert.Equal(t, []string{"/test.js"}, w.pushes) +} + +func TestWrite_HandlesTrailers(t *testing.T) { + r, err := NewResponse(roadrunner.Payload{ + Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Nil(t, w.h[trailerHeaderKey]) + assert.Nil(t, w.h["foo"]) //nolint:golint,staticcheck + assert.Nil(t, w.h["baz"]) //nolint:golint,staticcheck + + assert.Equal(t, "test", w.h.Get("Trailer:foo")) + assert.Equal(t, "demo", w.h.Get("Trailer:bar")) +} + +func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { + r, err := NewResponse(roadrunner.Payload{ + Context: []byte( + `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Equal(t, "a", w.h.Get("Trailer:foo")) + assert.Equal(t, "b", w.h.Get("Trailer:bar")) + assert.Equal(t, "c", w.h.Get("Trailer:baz")) +} diff --git a/plugins/http/rpc.go b/plugins/http/rpc.go new file mode 100644 index 00000000..56d8f1a1 --- /dev/null +++ b/plugins/http/rpc.go @@ -0,0 +1,34 @@ +package http + +//import ( +// "github.com/pkg/errors" +// "github.com/spiral/roadrunner/util" +//) + +//type rpcServer struct{ svc *Plugin } +// +//// WorkerList contains list of workers. +//type WorkerList struct { +// // Workers is list of workers. +// Workers []*util.State `json:"workers"` +//} +// +//// Reset resets underlying RR worker pool and restarts all of it's workers. +//func (rpc *rpcServer) Reset(reset bool, r *string) error { +// if rpc.svc == nil || rpc.svc.handler == nil { +// return errors.New("http server is not running") +// } +// +// *r = "OK" +// return rpc.svc.Server().Reset() +//} +// +//// Workers returns list of active workers and their stats. +//func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) { +// if rpc.svc == nil || rpc.svc.handler == nil { +// return errors.New("http server is not running") +// } +// +// r.Workers, err = util.ServerState(rpc.svc.Server()) +// return err +//} diff --git a/plugins/http/rpc_test.go b/plugins/http/rpc_test.go new file mode 100644 index 00000000..86499d46 --- /dev/null +++ b/plugins/http/rpc_test.go @@ -0,0 +1,222 @@ +package http + +// +//import ( +// json "github.com/json-iterator/go" +// "github.com/sirupsen/logrus" +// "github.com/sirupsen/logrus/hooks/test" +// "github.com/spiral/roadrunner/service" +// "github.com/spiral/roadrunner/service/rpc" +// "github.com/stretchr/testify/assert" +// "os" +// "strconv" +// "testing" +// "time" +//) +// +//func Test_RPC(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(rpc.ID, &rpc.Service{}) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{ +// rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`, +// httpCfg: `{ +// "enable": true, +// "address": ":16031", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php pid pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`})) +// +// s, _ := c.Get(ID) +// ss := s.(*Service) +// +// s2, _ := c.Get(rpc.ID) +// rs := s2.(*rpc.Service) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// +// time.Sleep(time.Second) +// +// res, _, err := get("http://localhost:16031") +// if err != nil { +// t.Fatal(err) +// } +// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res) +// +// cl, err := rs.Client() +// assert.NoError(t, err) +// +// r := "" +// assert.NoError(t, cl.Call("http.Reset", true, &r)) +// assert.Equal(t, "OK", r) +// +// res2, _, err := get("http://localhost:16031") +// if err != nil { +// t.Fatal(err) +// } +// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res2) +// assert.NotEqual(t, res, res2) +// c.Stop() +//} +// +//func Test_RPC_Unix(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(rpc.ID, &rpc.Service{}) +// c.Register(ID, &Service{}) +// +// sock := `unix://` + os.TempDir() + `/rpc.unix` +// j := json.ConfigCompatibleWithStandardLibrary +// data, _ := j.Marshal(sock) +// +// assert.NoError(t, c.Init(&testCfg{ +// rpcCfg: `{"enable":true, "listen":` + string(data) + `}`, +// httpCfg: `{ +// "enable": true, +// "address": ":6032", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php pid pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`})) +// +// s, _ := c.Get(ID) +// ss := s.(*Service) +// +// s2, _ := c.Get(rpc.ID) +// rs := s2.(*rpc.Service) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 500) +// +// res, _, err := get("http://localhost:6032") +// if err != nil { +// c.Stop() +// t.Fatal(err) +// } +// if ss.pool.Workers() != nil && len(ss.pool.Workers()) > 0 { +// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res) +// } else { +// c.Stop() +// t.Fatal("no workers initialized") +// } +// +// cl, err := rs.Client() +// if err != nil { +// c.Stop() +// t.Fatal(err) +// } +// +// r := "" +// assert.NoError(t, cl.Call("http.Reset", true, &r)) +// assert.Equal(t, "OK", r) +// +// res2, _, err := get("http://localhost:6032") +// if err != nil { +// c.Stop() +// t.Fatal(err) +// } +// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res2) +// assert.NotEqual(t, res, res2) +// c.Stop() +//} +// +//func Test_Workers(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(rpc.ID, &rpc.Service{}) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{ +// rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`, +// httpCfg: `{ +// "enable": true, +// "address": ":6033", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php pid pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`})) +// +// s, _ := c.Get(ID) +// ss := s.(*Service) +// +// s2, _ := c.Get(rpc.ID) +// rs := s2.(*rpc.Service) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// cl, err := rs.Client() +// assert.NoError(t, err) +// +// r := &WorkerList{} +// assert.NoError(t, cl.Call("http.Workers", true, &r)) +// assert.Len(t, r.Workers, 1) +// +// assert.Equal(t, *ss.pool.Workers()[0].Pid, r.Workers[0].Pid) +// c.Stop() +//} +// +//func Test_Errors(t *testing.T) { +// r := &rpcServer{nil} +// +// assert.Error(t, r.Reset(true, nil)) +// assert.Error(t, r.Workers(true, nil)) +//} diff --git a/plugins/http/ssl_test.go b/plugins/http/ssl_test.go new file mode 100644 index 00000000..df09aef5 --- /dev/null +++ b/plugins/http/ssl_test.go @@ -0,0 +1,255 @@ +package http + +// +//import ( +// "crypto/tls" +// "github.com/sirupsen/logrus" +// "github.com/sirupsen/logrus/hooks/test" +// "github.com/spiral/roadrunner/service" +// "github.com/stretchr/testify/assert" +// "io/ioutil" +// "net/http" +// "testing" +// "time" +//) +// +//var sslClient = &http.Client{ +// Transport: &http.Transport{ +// TLSClientConfig: &tls.Config{ +// InsecureSkipVerify: true, +// }, +// }, +//} +// +//func Test_SSL_Service_Echo(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{httpCfg: `{ +// "address": ":6029", +// "ssl": { +// "port": 6900, +// "key": "fixtures/server.key", +// "cert": "fixtures/server.crt" +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "pool": {"numWorkers": 1} +// } +// }`})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "https://localhost:6900?hello=world", nil) +// assert.NoError(t, err) +// +// r, err := sslClient.Do(req) +// assert.NoError(t, err) +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// err2 := r.Body.Close() +// if err2 != nil { +// t.Errorf("fail to close the Body: error %v", err2) +// } +// +// c.Stop() +//} +// +//func Test_SSL_Service_NoRedirect(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{httpCfg: `{ +// "address": ":6030", +// "ssl": { +// "port": 6901, +// "key": "fixtures/server.key", +// "cert": "fixtures/server.crt" +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "pool": {"numWorkers": 1} +// } +// }`})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil) +// assert.NoError(t, err) +// +// r, err := sslClient.Do(req) +// assert.NoError(t, err) +// +// assert.Nil(t, r.TLS) +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// err2 := r.Body.Close() +// if err2 != nil { +// t.Errorf("fail to close the Body: error %v", err2) +// } +// c.Stop() +//} +// +//func Test_SSL_Service_Redirect(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{httpCfg: `{ +// "address": ":6831", +// "ssl": { +// "port": 6902, +// "redirect": true, +// "key": "fixtures/server.key", +// "cert": "fixtures/server.crt" +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "pool": {"numWorkers": 1} +// } +// }`})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "http://localhost:6831?hello=world", nil) +// assert.NoError(t, err) +// +// r, err := sslClient.Do(req) +// assert.NoError(t, err) +// assert.NotNil(t, r.TLS) +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// err2 := r.Body.Close() +// if err2 != nil { +// t.Errorf("fail to close the Body: error %v", err2) +// } +// c.Stop() +//} +// +//func Test_SSL_Service_Push(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{httpCfg: `{ +// "address": ":6032", +// "ssl": { +// "port": 6903, +// "redirect": true, +// "key": "fixtures/server.key", +// "cert": "fixtures/server.crt" +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php push pipes", +// "pool": {"numWorkers": 1} +// } +// }`})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "https://localhost:6903?hello=world", nil) +// assert.NoError(t, err) +// +// r, err := sslClient.Do(req) +// assert.NoError(t, err) +// +// assert.NotNil(t, r.TLS) +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.Equal(t, "", r.Header.Get("Http2-Push")) +// +// assert.NoError(t, err) +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// +// err2 := r.Body.Close() +// if err2 != nil { +// t.Errorf("fail to close the Body: error %v", err2) +// } +// c.Stop() +//} diff --git a/plugins/http/test/.rr-http.yaml b/plugins/http/test/.rr-http.yaml new file mode 100644 index 00000000..6fbfd378 --- /dev/null +++ b/plugins/http/test/.rr-http.yaml @@ -0,0 +1,37 @@ +server: + command: "php psr-worker.php" + user: "" + group: "" + env: + "RR_HTTP": "true" + relay: "pipes" + relayTimeout: "20s" + +http: + debug: true + address: 0.0.0.0:8080 + maxRequestSize: 200 + middleware: [ "" ] + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + numWorkers: 4 + maxJobs: 0 + allocateTimeout: 60s + destroyTimeout: 60s + + # ssl: + # port: 443 + # redirect: true + # cert: server.crt + # key: server.key + # rootCa: root.crt + fcgi: + address: tcp://0.0.0.0:6920 + http2: + enabled: false + h2c: false + maxConcurrentStreams: 128 + + diff --git a/plugins/http/test/http_test.go b/plugins/http/test/http_test.go new file mode 100644 index 00000000..07925d33 --- /dev/null +++ b/plugins/http/test/http_test.go @@ -0,0 +1,72 @@ +package test + +import ( + "os" + "os/signal" + "syscall" + "testing" + "time" + + "github.com/spiral/endure" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/logger" + //rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/stretchr/testify/assert" +) + +func TestHTTPInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.Visualize(endure.StdOut, "")) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: ".rr-http.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + //&rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &http.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Minute * 3) + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } +} diff --git a/plugins/http/test/psr-worker.php b/plugins/http/test/psr-worker.php new file mode 100644 index 00000000..65fc6bde --- /dev/null +++ b/plugins/http/test/psr-worker.php @@ -0,0 +1,23 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ +use Spiral\Goridge; +use Spiral\RoadRunner; + +ini_set('display_errors', 'stderr'); +require dirname(__DIR__) . "/../../vendor_php/autoload.php"; + +$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$psr7 = new RoadRunner\PSR7Client($worker); + +while ($req = $psr7->acceptRequest()) { + try { + $resp = new \Zend\Diactoros\Response(); + $resp->getBody()->write("hello world"); + + $psr7->respond($resp); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +}
\ No newline at end of file diff --git a/plugins/http/uploads.go b/plugins/http/uploads.go new file mode 100644 index 00000000..2a1524ef --- /dev/null +++ b/plugins/http/uploads.go @@ -0,0 +1,160 @@ +package http + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/interfaces/log" + + "io" + "io/ioutil" + "mime/multipart" + "os" + "sync" +) + +const ( + // UploadErrorOK - no error, the file uploaded with success. + UploadErrorOK = 0 + + // UploadErrorNoFile - no file was uploaded. + UploadErrorNoFile = 4 + + // UploadErrorNoTmpDir - missing a temporary folder. + UploadErrorNoTmpDir = 5 + + // UploadErrorCantWrite - failed to write file to disk. + UploadErrorCantWrite = 6 + + // UploadErrorExtension - forbidden file extension. + UploadErrorExtension = 7 +) + +// Uploads tree manages uploaded files tree and temporary files. +type Uploads struct { + // associated temp directory and forbidden extensions. + cfg *UploadsConfig + + // pre processed data tree for Uploads. + tree fileTree + + // flat list of all file Uploads. + list []*FileUpload +} + +// MarshalJSON marshal tree tree into JSON. +func (u *Uploads) MarshalJSON() ([]byte, error) { + j := json.ConfigCompatibleWithStandardLibrary + return j.Marshal(u.tree) +} + +// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors +// will be handled individually. +func (u *Uploads) Open(log log.Logger) { + var wg sync.WaitGroup + for _, f := range u.list { + wg.Add(1) + go func(f *FileUpload) { + defer wg.Done() + err := f.Open(u.cfg) + if err != nil && log != nil { + log.Error("error opening the file", "err", err) + } + }(f) + } + + wg.Wait() +} + +// Clear deletes all temporary files. +func (u *Uploads) Clear(log log.Logger) { + for _, f := range u.list { + if f.TempFilename != "" && exists(f.TempFilename) { + err := os.Remove(f.TempFilename) + if err != nil && log != nil { + log.Error("error removing the file", "err", err) + } + } + } +} + +// FileUpload represents singular file NewUpload. +type FileUpload struct { + // ID contains filename specified by the client. + Name string `json:"name"` + + // Mime contains mime-type provided by the client. + Mime string `json:"mime"` + + // Size of the uploaded file. + Size int64 `json:"size"` + + // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php + Error int `json:"error"` + + // TempFilename points to temporary file location. + TempFilename string `json:"tmpName"` + + // associated file header + header *multipart.FileHeader +} + +// NewUpload wraps net/http upload into PRS-7 compatible structure. +func NewUpload(f *multipart.FileHeader) *FileUpload { + return &FileUpload{ + Name: f.Filename, + Mime: f.Header.Get("Content-Type"), + Error: UploadErrorOK, + header: f, + } +} + +// Open moves file content into temporary file available for PHP. +// NOTE: +// There is 2 deferred functions, and in case of getting 2 errors from both functions +// error from close of temp file would be overwritten by error from the main file +// STACK +// DEFER FILE CLOSE (2) +// DEFER TMP CLOSE (1) +func (f *FileUpload) Open(cfg *UploadsConfig) (err error) { + if cfg.Forbids(f.Name) { + f.Error = UploadErrorExtension + return nil + } + + file, err := f.header.Open() + if err != nil { + f.Error = UploadErrorNoFile + return err + } + + defer func() { + // close the main file + err = file.Close() + }() + + tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") + if err != nil { + // most likely cause of this issue is missing tmp dir + f.Error = UploadErrorNoTmpDir + return err + } + + f.TempFilename = tmp.Name() + defer func() { + // close the temp file + err = tmp.Close() + }() + + if f.Size, err = io.Copy(tmp, file); err != nil { + f.Error = UploadErrorCantWrite + } + + return err +} + +// exists if file exists. +func exists(path string) bool { + if _, err := os.Stat(path); os.IsNotExist(err) { + return false + } + return true +} diff --git a/plugins/http/uploads_config.go b/plugins/http/uploads_config.go new file mode 100644 index 00000000..3f655064 --- /dev/null +++ b/plugins/http/uploads_config.go @@ -0,0 +1,45 @@ +package http + +import ( + "os" + "path" + "strings" +) + +// UploadsConfig describes file location and controls access to them. +type UploadsConfig struct { + // Dir contains name of directory to control access to. + Dir string + + // Forbid specifies list of file extensions which are forbidden for access. + // Example: .php, .exe, .bat, .htaccess and etc. + Forbid []string +} + +// InitDefaults sets missing values to their default values. +func (cfg *UploadsConfig) InitDefaults() error { + cfg.Forbid = []string{".php", ".exe", ".bat"} + return nil +} + +// TmpDir returns temporary directory. +func (cfg *UploadsConfig) TmpDir() string { + if cfg.Dir != "" { + return cfg.Dir + } + + return os.TempDir() +} + +// Forbids must return true if file extension is not allowed for the upload. +func (cfg *UploadsConfig) Forbids(filename string) bool { + ext := strings.ToLower(path.Ext(filename)) + + for _, v := range cfg.Forbid { + if ext == v { + return true + } + } + + return false +} diff --git a/plugins/http/uploads_config_test.go b/plugins/http/uploads_config_test.go new file mode 100644 index 00000000..2b6ceebc --- /dev/null +++ b/plugins/http/uploads_config_test.go @@ -0,0 +1,24 @@ +package http + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestFsConfig_Forbids(t *testing.T) { + cfg := UploadsConfig{Forbid: []string{".php"}} + + assert.True(t, cfg.Forbids("index.php")) + assert.True(t, cfg.Forbids("index.PHP")) + assert.True(t, cfg.Forbids("phpadmin/index.bak.php")) + assert.False(t, cfg.Forbids("index.html")) +} + +func TestFsConfig_TmpFallback(t *testing.T) { + cfg := UploadsConfig{Dir: "test"} + assert.Equal(t, "test", cfg.TmpDir()) + + cfg = UploadsConfig{Dir: ""} + assert.Equal(t, os.TempDir(), cfg.TmpDir()) +} diff --git a/plugins/http/uploads_test.go b/plugins/http/uploads_test.go new file mode 100644 index 00000000..b023b28f --- /dev/null +++ b/plugins/http/uploads_test.go @@ -0,0 +1,435 @@ +package http + +// +//import ( +// "bytes" +// "context" +// "crypto/md5" +// "encoding/hex" +// "fmt" +// "io" +// "io/ioutil" +// "mime/multipart" +// "net/http" +// "os" +// "testing" +// "time" +// +// json "github.com/json-iterator/go" +// "github.com/stretchr/testify/assert" +//) +// +//func TestHandler_Upload_File(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php upload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8021", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// +// f := mustOpen("uploads_test.go") +// defer func() { +// err := f.Close() +// if err != nil { +// t.Errorf("failed to close a file: error %v", err) +// } +// }() +// fw, err := w.CreateFormFile("upload", f.Name()) +// assert.NotNil(t, fw) +// assert.NoError(t, err) +// _, err = io.Copy(fw, f) +// if err != nil { +// t.Errorf("error copying the file: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the file: error %v", err) +// } +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error closing the Body: error %v", err) +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// fs := fileString("uploads_test.go", 0, "application/octet-stream") +// +// assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +//} +// +//func TestHandler_Upload_NestedFile(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php upload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8021", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// +// f := mustOpen("uploads_test.go") +// defer func() { +// err := f.Close() +// if err != nil { +// t.Errorf("failed to close a file: error %v", err) +// } +// }() +// fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name()) +// assert.NotNil(t, fw) +// assert.NoError(t, err) +// _, err = io.Copy(fw, f) +// if err != nil { +// t.Errorf("error copying the file: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the file: error %v", err) +// } +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error closing the Body: error %v", err) +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// fs := fileString("uploads_test.go", 0, "application/octet-stream") +// +// assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b)) +//} +// +//func TestHandler_Upload_File_NoTmpDir(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: "-----", +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php upload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8021", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// +// f := mustOpen("uploads_test.go") +// defer func() { +// err := f.Close() +// if err != nil { +// t.Errorf("failed to close a file: error %v", err) +// } +// }() +// fw, err := w.CreateFormFile("upload", f.Name()) +// assert.NotNil(t, fw) +// assert.NoError(t, err) +// _, err = io.Copy(fw, f) +// if err != nil { +// t.Errorf("error copying the file: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the file: error %v", err) +// } +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error closing the Body: error %v", err) +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// fs := fileString("uploads_test.go", 5, "application/octet-stream") +// +// assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +//} +// +//func TestHandler_Upload_File_Forbids(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php upload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8021", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// +// f := mustOpen("uploads_test.go") +// defer func() { +// err := f.Close() +// if err != nil { +// t.Errorf("failed to close a file: error %v", err) +// } +// }() +// fw, err := w.CreateFormFile("upload", f.Name()) +// assert.NotNil(t, fw) +// assert.NoError(t, err) +// _, err = io.Copy(fw, f) +// if err != nil { +// t.Errorf("error copying the file: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the file: error %v", err) +// } +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error closing the Body: error %v", err) +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// fs := fileString("uploads_test.go", 7, "application/octet-stream") +// +// assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +//} +// +//func Test_FileExists(t *testing.T) { +// assert.True(t, exists("uploads_test.go")) +// assert.False(t, exists("uploads_test.")) +//} +// +//func mustOpen(f string) *os.File { +// r, err := os.Open(f) +// if err != nil { +// panic(err) +// } +// return r +//} +// +//type fInfo struct { +// Name string `json:"name"` +// Size int64 `json:"size"` +// Mime string `json:"mime"` +// Error int `json:"error"` +// MD5 string `json:"md5,omitempty"` +//} +// +//func fileString(f string, errNo int, mime string) string { +// s, err := os.Stat(f) +// if err != nil { +// fmt.Println(fmt.Errorf("error stat the file, error: %v", err)) +// } +// +// ff, err := os.Open(f) +// if err != nil { +// fmt.Println(fmt.Errorf("error opening the file, error: %v", err)) +// } +// +// defer func() { +// er := ff.Close() +// if er != nil { +// fmt.Println(fmt.Errorf("error closing the file, error: %v", er)) +// } +// }() +// +// h := md5.New() +// _, err = io.Copy(h, ff) +// if err != nil { +// fmt.Println(fmt.Errorf("error copying the file, error: %v", err)) +// } +// +// v := &fInfo{ +// Name: s.Name(), +// Size: s.Size(), +// Error: errNo, +// Mime: mime, +// MD5: hex.EncodeToString(h.Sum(nil)), +// } +// +// if errNo != 0 { +// v.MD5 = "" +// v.Size = 0 +// } +// +// j := json.ConfigCompatibleWithStandardLibrary +// r, err := j.Marshal(v) +// if err != nil { +// fmt.Println(fmt.Errorf("error marshalling fInfo, error: %v", err)) +// } +// return string(r) +// +//} diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index e096708a..4d606390 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -25,49 +25,47 @@ type Plugin struct { } // Init application provider. -func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error { +func (server *Plugin) Init(cfg config.Configurer, log log.Logger) error { const op = errors.Op("Init") - err := cfg.UnmarshalKey(ServiceName, &app.cfg) + err := cfg.UnmarshalKey(ServiceName, &server.cfg) if err != nil { return errors.E(op, errors.Init, err) } - app.cfg.InitDefaults() - app.log = log + server.cfg.InitDefaults() + server.log = log + + server.factory, err = server.initFactory() + if err != nil { + return errors.E(errors.Op("Init factory"), err) + } return nil } // Name contains service name. -func (app *Plugin) Name() string { +func (server *Plugin) Name() string { return ServiceName } -func (app *Plugin) Serve() chan error { +func (server *Plugin) Serve() chan error { errCh := make(chan error, 1) - var err error - - app.factory, err = app.initFactory() - if err != nil { - errCh <- errors.E(errors.Op("init factory"), err) - } - return errCh } -func (app *Plugin) Stop() error { - if app.factory == nil { +func (server *Plugin) Stop() error { + if server.factory == nil { return nil } - return app.factory.Close(context.Background()) + return server.factory.Close(context.Background()) } // CmdFactory provides worker command factory assocated with given context. -func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { +func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { var cmdArgs []string // create command according to the config - cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) + cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...) return func() *exec.Cmd { cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) @@ -75,67 +73,67 @@ func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { // if user is not empty, and OS is linux or macos // execute php worker from that particular user - if app.cfg.User != "" { - err := util.ExecuteFromUser(cmd, app.cfg.User) + if server.cfg.User != "" { + err := util.ExecuteFromUser(cmd, server.cfg.User) if err != nil { return nil } } - cmd.Env = app.setEnv(env) + cmd.Env = server.setEnv(env) return cmd }, nil } // NewWorker issues new standalone worker. -func (app *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { +func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { const op = errors.Op("new worker") - spawnCmd, err := app.CmdFactory(env) + spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, errors.E(op, err) } - w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) + w, err := server.factory.SpawnWorkerWithContext(ctx, spawnCmd()) if err != nil { return nil, errors.E(op, err) } - w.AddListener(app.collectLogs) + w.AddListener(server.collectLogs) return w, nil } // NewWorkerPool issues new worker pool. -func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { - spawnCmd, err := app.CmdFactory(env) +func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { + spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, err } - p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt) if err != nil { return nil, err } - p.AddListener(app.collectLogs) + p.AddListener(server.collectLogs) return p, nil } // creates relay and worker factory. -func (app *Plugin) initFactory() (roadrunner.Factory, error) { +func (server *Plugin) initFactory() (roadrunner.Factory, error) { const op = errors.Op("network factory init") - if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { + if server.cfg.Relay == "" || server.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } - dsn := strings.Split(app.cfg.Relay, "://") + dsn := strings.Split(server.cfg.Relay, "://") if len(dsn) != 2 { return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } - lsn, err := util.CreateListener(app.cfg.Relay) + lsn, err := util.CreateListener(server.cfg.Relay) if err != nil { return nil, errors.E(op, errors.Network, err) } @@ -143,16 +141,16 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) { switch dsn[0] { // sockets group case "unix": - return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil case "tcp": - return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil default: return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } -func (app *Plugin) setEnv(e server.Env) []string { - env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) +func (server *Plugin) setEnv(e server.Env) []string { + env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay)) for k, v := range e { env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) } @@ -160,13 +158,13 @@ func (app *Plugin) setEnv(e server.Env) []string { return env } -func (app *Plugin) collectLogs(event interface{}) { +func (server *Plugin) collectLogs(event interface{}) { if we, ok := event.(roadrunner.WorkerEvent); ok { switch we.Event { case roadrunner.EventWorkerError: - app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) + server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) case roadrunner.EventWorkerLog: - app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) + server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) } } } diff --git a/src/Diactoros/ServerRequestFactory.php b/src/Diactoros/ServerRequestFactory.php new file mode 100644 index 00000000..3fcf8e29 --- /dev/null +++ b/src/Diactoros/ServerRequestFactory.php @@ -0,0 +1,26 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner\Diactoros; + +use Psr\Http\Message\ServerRequestFactoryInterface; +use Psr\Http\Message\ServerRequestInterface; +use Zend\Diactoros\ServerRequest; + +final class ServerRequestFactory implements ServerRequestFactoryInterface +{ + /** + * @inheritdoc + */ + public function createServerRequest(string $method, $uri, array $serverParams = []): ServerRequestInterface + { + $uploadedFiles = []; + return new ServerRequest($serverParams, $uploadedFiles, $uri, $method); + } +} diff --git a/src/Diactoros/StreamFactory.php b/src/Diactoros/StreamFactory.php new file mode 100644 index 00000000..cc0a5306 --- /dev/null +++ b/src/Diactoros/StreamFactory.php @@ -0,0 +1,57 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner\Diactoros; + +use RuntimeException; +use Psr\Http\Message\StreamFactoryInterface; +use Psr\Http\Message\StreamInterface; +use Zend\Diactoros\Stream; + +final class StreamFactory implements StreamFactoryInterface +{ + /** + * @inheritdoc + * @throws RuntimeException + */ + public function createStream(string $content = ''): StreamInterface + { + $resource = fopen('php://temp', 'rb+'); + + if (! \is_resource($resource)) { + throw new RuntimeException('Cannot create stream'); + } + + fwrite($resource, $content); + rewind($resource); + return $this->createStreamFromResource($resource); + } + + /** + * @inheritdoc + */ + public function createStreamFromFile(string $file, string $mode = 'rb'): StreamInterface + { + $resource = fopen($file, $mode); + + if (! \is_resource($resource)) { + throw new RuntimeException('Cannot create stream'); + } + + return $this->createStreamFromResource($resource); + } + + /** + * @inheritdoc + */ + public function createStreamFromResource($resource): StreamInterface + { + return new Stream($resource); + } +} diff --git a/src/Diactoros/UploadedFileFactory.php b/src/Diactoros/UploadedFileFactory.php new file mode 100644 index 00000000..45773287 --- /dev/null +++ b/src/Diactoros/UploadedFileFactory.php @@ -0,0 +1,36 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner\Diactoros; + +use Psr\Http\Message\StreamInterface; +use Psr\Http\Message\UploadedFileFactoryInterface; +use Psr\Http\Message\UploadedFileInterface; +use Zend\Diactoros\UploadedFile; + +final class UploadedFileFactory implements UploadedFileFactoryInterface +{ + /** + * @inheritdoc + */ + public function createUploadedFile( + StreamInterface $stream, + int $size = null, + int $error = \UPLOAD_ERR_OK, + string $clientFilename = null, + string $clientMediaType = null + ): UploadedFileInterface { + if ($size === null) { + $size = (int) $stream->getSize(); + } + + /** @var resource $stream */ + return new UploadedFile($stream, $size, $error, $clientFilename, $clientMediaType); + } +} diff --git a/src/Exception/MetricException.php b/src/Exception/MetricException.php index d5b738b8..d5b738b8 100755..100644 --- a/src/Exception/MetricException.php +++ b/src/Exception/MetricException.php diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php index cd657502..cd657502 100755..100644 --- a/src/Exception/RoadRunnerException.php +++ b/src/Exception/RoadRunnerException.php diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php new file mode 100644 index 00000000..43967893 --- /dev/null +++ b/src/Exceptions/RoadRunnerException.php @@ -0,0 +1,18 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner\Exceptions; + +/** + * @deprecated use \Spiral\RoadRunner\Exception\RoadRunnerException instead + */ +class RoadRunnerException extends \RuntimeException +{ +} diff --git a/src/HttpClient.php b/src/HttpClient.php new file mode 100644 index 00000000..4ca152c8 --- /dev/null +++ b/src/HttpClient.php @@ -0,0 +1,75 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Alex Bond + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +final class HttpClient +{ + /** @var Worker */ + private $worker; + + /** + * @param Worker $worker + */ + public function __construct(Worker $worker) + { + $this->worker = $worker; + } + + /** + * @return Worker + */ + public function getWorker(): Worker + { + return $this->worker; + } + + /** + * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string] + * or null if termination request or invalid context. + */ + public function acceptRequest(): ?array + { + $body = $this->getWorker()->receive($ctx); + if (empty($body) && empty($ctx)) { + // termination request + return null; + } + + $ctx = json_decode($ctx, true); + if ($ctx === null) { + // invalid context + return null; + } + + return ['ctx' => $ctx, 'body' => $body]; + } + + /** + * Send response to the application server. + * + * @param int $status Http status code + * @param string $body Body of response + * @param string[][] $headers An associative array of the message's headers. Each + * key MUST be a header name, and each value MUST be an array of strings + * for that header. + */ + public function respond(int $status, string $body, array $headers = []): void + { + if (empty($headers)) { + // this is required to represent empty header set as map and not as array + $headers = new \stdClass(); + } + + $this->getWorker()->send( + $body, + (string) json_encode(['status' => $status, 'headers' => $headers]) + ); + } +} diff --git a/src/Metrics.php b/src/Metrics.php new file mode 100644 index 00000000..d6b6e1da --- /dev/null +++ b/src/Metrics.php @@ -0,0 +1,80 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exceptions\RPCException; +use Spiral\Goridge\RPC; +use Spiral\RoadRunner\Exception\MetricException; + +/** + * Application metrics. + */ +final class Metrics implements MetricsInterface +{ + /** @var RPC */ + private $rpc; + + /** + * @param RPC $rpc + */ + public function __construct(RPC $rpc) + { + $this->rpc = $rpc; + } + + /** + * @inheritDoc + */ + public function add(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Add', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function sub(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function observe(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function set(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Set', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } +} diff --git a/src/MetricsInterface.php b/src/MetricsInterface.php new file mode 100644 index 00000000..ec2009b0 --- /dev/null +++ b/src/MetricsInterface.php @@ -0,0 +1,64 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\RoadRunner\Exception\MetricException; + +interface MetricsInterface +{ + /** + * Add collector value. Fallback to appropriate method of related collector. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function add(string $collector, float $value, array $labels = []); + + /** + * Subtract the collector value, only for gauge collector. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function sub(string $collector, float $value, array $labels = []); + + /** + * Observe collector value, only for histogram and summary collectors. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function observe(string $collector, float $value, array $labels = []); + + /** + * Set collector value, only for gauge collector. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function set(string $collector, float $value, array $labels = []); +} diff --git a/src/PSR7Client.php b/src/PSR7Client.php new file mode 100644 index 00000000..777dd891 --- /dev/null +++ b/src/PSR7Client.php @@ -0,0 +1,217 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestFactoryInterface; +use Psr\Http\Message\ServerRequestInterface; +use Psr\Http\Message\StreamFactoryInterface; +use Psr\Http\Message\UploadedFileFactoryInterface; +use Psr\Http\Message\UploadedFileInterface; + +/** + * Manages PSR-7 request and response. + */ +class PSR7Client +{ + /** @var HttpClient */ + private $httpClient; + + /** @var ServerRequestFactoryInterface */ + private $requestFactory; + + /** @var StreamFactoryInterface */ + private $streamFactory; + + /** @var UploadedFileFactoryInterface */ + private $uploadsFactory; + + /** @var mixed[] */ + private $originalServer = []; + + /** @var string[] Valid values for HTTP protocol version */ + private static $allowedVersions = ['1.0', '1.1', '2',]; + + /** + * @param Worker $worker + * @param ServerRequestFactoryInterface|null $requestFactory + * @param StreamFactoryInterface|null $streamFactory + * @param UploadedFileFactoryInterface|null $uploadsFactory + */ + public function __construct( + Worker $worker, + ServerRequestFactoryInterface $requestFactory = null, + StreamFactoryInterface $streamFactory = null, + UploadedFileFactoryInterface $uploadsFactory = null + ) { + $this->httpClient = new HttpClient($worker); + $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory(); + $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory(); + $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory(); + $this->originalServer = $_SERVER; + } + + /** + * @return Worker + */ + public function getWorker(): Worker + { + return $this->httpClient->getWorker(); + } + + /** + * @return ServerRequestInterface|null + */ + public function acceptRequest(): ?ServerRequestInterface + { + $rawRequest = $this->httpClient->acceptRequest(); + if ($rawRequest === null) { + return null; + } + + $_SERVER = $this->configureServer($rawRequest['ctx']); + + $request = $this->requestFactory->createServerRequest( + $rawRequest['ctx']['method'], + $rawRequest['ctx']['uri'], + $_SERVER + ); + + parse_str($rawRequest['ctx']['rawQuery'], $query); + + $request = $request + ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol'])) + ->withCookieParams($rawRequest['ctx']['cookies']) + ->withQueryParams($query) + ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads'])); + + foreach ($rawRequest['ctx']['attributes'] as $name => $value) { + $request = $request->withAttribute($name, $value); + } + + foreach ($rawRequest['ctx']['headers'] as $name => $value) { + $request = $request->withHeader($name, $value); + } + + if ($rawRequest['ctx']['parsed']) { + return $request->withParsedBody(json_decode($rawRequest['body'], true)); + } + + if ($rawRequest['body'] !== null) { + return $request->withBody($this->streamFactory->createStream($rawRequest['body'])); + } + + return $request; + } + + /** + * Send response to the application server. + * + * @param ResponseInterface $response + */ + public function respond(ResponseInterface $response): void + { + $this->httpClient->respond( + $response->getStatusCode(), + $response->getBody()->__toString(), + $response->getHeaders() + ); + } + + /** + * Returns altered copy of _SERVER variable. Sets ip-address, + * request-time and other values. + * + * @param mixed[] $ctx + * @return mixed[] + */ + protected function configureServer(array $ctx): array + { + $server = $this->originalServer; + + $server['REQUEST_URI'] = $ctx['uri']; + $server['REQUEST_TIME'] = time(); + $server['REQUEST_TIME_FLOAT'] = microtime(true); + $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1'; + $server['REQUEST_METHOD'] = $ctx['method']; + + $server['HTTP_USER_AGENT'] = ''; + foreach ($ctx['headers'] as $key => $value) { + $key = strtoupper(str_replace('-', '_', $key)); + if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) { + $server[$key] = implode(', ', $value); + } else { + $server['HTTP_' . $key] = implode(', ', $value); + } + } + + return $server; + } + + /** + * Wraps all uploaded files with UploadedFile. + * + * @param array[] $files + * + * @return UploadedFileInterface[]|mixed[] + */ + private function wrapUploads($files): array + { + if (empty($files)) { + return []; + } + + $result = []; + foreach ($files as $index => $f) { + if (!isset($f['name'])) { + $result[$index] = $this->wrapUploads($f); + continue; + } + + if (UPLOAD_ERR_OK === $f['error']) { + $stream = $this->streamFactory->createStreamFromFile($f['tmpName']); + } else { + $stream = $this->streamFactory->createStream(); + } + + $result[$index] = $this->uploadsFactory->createUploadedFile( + $stream, + $f['size'], + $f['error'], + $f['name'], + $f['mime'] + ); + } + + return $result; + } + + /** + * Normalize HTTP protocol version to valid values + * + * @param string $version + * @return string + */ + private static function fetchProtocolVersion(string $version): string + { + $v = substr($version, 5); + + if ($v === '2.0') { + return '2'; + } + + // Fallback for values outside of valid protocol versions + if (!in_array($v, static::$allowedVersions, true)) { + return '1.1'; + } + + return $v; + } +} diff --git a/src/Worker.php b/src/Worker.php index d509562e..d509562e 100755..100644 --- a/src/Worker.php +++ b/src/Worker.php diff --git a/static_pool.go b/static_pool.go index c1dacd8d..d5511018 100755 --- a/static_pool.go +++ b/static_pool.go @@ -54,6 +54,9 @@ type StaticPool struct { // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg PoolConfig, options ...PoolOptions) (Pool, error) { const op = errors.Op("NewPool") + if factory == nil { + return nil, errors.E(op, errors.Str("no factory initialized")) + } cfg.InitDefaults() if cfg.Debug { @@ -75,13 +78,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Poo workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) if err != nil { - return nil, err + return nil, errors.E(op, err) } // put stack in the pool err = p.ww.AddToWatch(ctx, workers) if err != nil { - return nil, err + return nil, errors.E(op, err) } p.errEncoder = defaultErrEncoder(p) |