From 3cbdd3d3e44b3b4e72565d666391e3b732950774 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 17 Nov 2020 16:25:35 +0300 Subject: Get http working with new container --- composer.json | 7 +- composer.lock | 389 +++++++++++++++++++++++---------- go.mod | 4 +- go.sum | 4 + interfaces/app/interface.go | 15 -- plugins/http/config.go | 59 +++-- plugins/http/plugin.go | 252 ++++++++++++--------- plugins/http/response.go | 4 +- plugins/http/rpc.go | 62 +++--- plugins/http/test/.rr-http.yaml | 37 ++++ plugins/http/test/http_test.go | 53 ++++- plugins/http/test/psr-worker.php | 23 ++ plugins/http/test/rr-http.yaml | 34 --- plugins/server/plugin.go | 78 ++++--- src/Diactoros/ServerRequestFactory.php | 26 +++ src/Diactoros/StreamFactory.php | 57 +++++ src/Diactoros/UploadedFileFactory.php | 36 +++ src/Exception/MetricException.php | 0 src/Exception/RoadRunnerException.php | 2 +- src/Exceptions/RoadRunnerException.php | 18 ++ src/Http/HttpClient.php | 75 ------- src/Http/PSR7Client.php | 217 ------------------ src/HttpClient.php | 75 +++++++ src/Logger/.empty | 0 src/Metrics.php | 80 +++++++ src/Metrics/Metrics.php | 80 ------- src/Metrics/MetricsInterface.php | 64 ------ src/MetricsInterface.php | 64 ++++++ src/PSR7Client.php | 217 ++++++++++++++++++ src/Worker.php | 0 src/WorkerInterface.php | 0 static_pool.go | 3 + 32 files changed, 1225 insertions(+), 810 deletions(-) delete mode 100644 interfaces/app/interface.go create mode 100644 plugins/http/test/.rr-http.yaml create mode 100644 plugins/http/test/psr-worker.php delete mode 100644 plugins/http/test/rr-http.yaml create mode 100644 src/Diactoros/ServerRequestFactory.php create mode 100644 src/Diactoros/StreamFactory.php create mode 100644 src/Diactoros/UploadedFileFactory.php mode change 100755 => 100644 src/Exception/MetricException.php mode change 100755 => 100644 src/Exception/RoadRunnerException.php create mode 100644 src/Exceptions/RoadRunnerException.php delete mode 100644 src/Http/HttpClient.php delete mode 100644 src/Http/PSR7Client.php create mode 100644 src/HttpClient.php delete mode 100644 src/Logger/.empty create mode 100644 src/Metrics.php delete mode 100644 src/Metrics/Metrics.php delete mode 100644 src/Metrics/MetricsInterface.php create mode 100644 src/MetricsInterface.php create mode 100644 src/PSR7Client.php mode change 100755 => 100644 src/Worker.php delete mode 100644 src/WorkerInterface.php diff --git a/composer.json b/composer.json index aef75b08..283eaab1 100755 --- a/composer.json +++ b/composer.json @@ -18,14 +18,15 @@ "ext-json": "*", "ext-curl": "*", "spiral/goridge": "^2.4.2", - "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0 || ^5.0.0" + "psr/http-factory": "^1.0", + "psr/http-message": "^1.0", + "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0 || ^5.0.0", + "laminas/laminas-diactoros": "^1.3 || ^2.0" }, "config": { "vendor-dir": "vendor_php" }, "require-dev": { - "psr/http-factory": "^1.0", - "psr/http-message": "^1.0", "phpstan/phpstan": "~0.12" }, "scripts": { diff --git a/composer.lock b/composer.lock index 183f9fef..ded194f5 100755 --- a/composer.lock +++ b/composer.lock @@ -4,8 +4,167 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "95535b37e4eb6476a2f89ea1b0f16e48", + "content-hash": "439018483d4d3a37c3d369d2587b8311", "packages": [ + { + "name": "laminas/laminas-diactoros", + "version": "2.4.1", + "source": { + "type": "git", + "url": "https://github.com/laminas/laminas-diactoros.git", + "reference": "36ef09b73e884135d2059cc498c938e90821bb57" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/laminas/laminas-diactoros/zipball/36ef09b73e884135d2059cc498c938e90821bb57", + "reference": "36ef09b73e884135d2059cc498c938e90821bb57", + "shasum": "" + }, + "require": { + "laminas/laminas-zendframework-bridge": "^1.0", + "php": "^7.1", + "psr/http-factory": "^1.0", + "psr/http-message": "^1.0" + }, + "conflict": { + "phpspec/prophecy": "<1.9.0" + }, + "provide": { + "psr/http-factory-implementation": "1.0", + "psr/http-message-implementation": "1.0" + }, + "replace": { + "zendframework/zend-diactoros": "^2.2.1" + }, + "require-dev": { + "ext-curl": "*", + "ext-dom": "*", + "ext-gd": "*", + "ext-libxml": "*", + "http-interop/http-factory-tests": "^0.5.0", + "laminas/laminas-coding-standard": "~1.0.0", + "php-http/psr7-integration-tests": "^1.0", + "phpunit/phpunit": "^7.5.18" + }, + "type": "library", + "extra": { + "laminas": { + "config-provider": "Laminas\\Diactoros\\ConfigProvider", + "module": "Laminas\\Diactoros" + } + }, + "autoload": { + "files": [ + "src/functions/create_uploaded_file.php", + "src/functions/marshal_headers_from_sapi.php", + "src/functions/marshal_method_from_sapi.php", + "src/functions/marshal_protocol_version_from_sapi.php", + "src/functions/marshal_uri_from_sapi.php", + "src/functions/normalize_server.php", + "src/functions/normalize_uploaded_files.php", + "src/functions/parse_cookie_header.php", + "src/functions/create_uploaded_file.legacy.php", + "src/functions/marshal_headers_from_sapi.legacy.php", + "src/functions/marshal_method_from_sapi.legacy.php", + "src/functions/marshal_protocol_version_from_sapi.legacy.php", + "src/functions/marshal_uri_from_sapi.legacy.php", + "src/functions/normalize_server.legacy.php", + "src/functions/normalize_uploaded_files.legacy.php", + "src/functions/parse_cookie_header.legacy.php" + ], + "psr-4": { + "Laminas\\Diactoros\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "description": "PSR HTTP Message implementations", + "homepage": "https://laminas.dev", + "keywords": [ + "http", + "laminas", + "psr", + "psr-17", + "psr-7" + ], + "support": { + "chat": "https://laminas.dev/chat", + "docs": "https://docs.laminas.dev/laminas-diactoros/", + "forum": "https://discourse.laminas.dev", + "issues": "https://github.com/laminas/laminas-diactoros/issues", + "rss": "https://github.com/laminas/laminas-diactoros/releases.atom", + "source": "https://github.com/laminas/laminas-diactoros" + }, + "funding": [ + { + "url": "https://funding.communitybridge.org/projects/laminas-project", + "type": "community_bridge" + } + ], + "time": "2020-09-03T14:29:41+00:00" + }, + { + "name": "laminas/laminas-zendframework-bridge", + "version": "1.1.1", + "source": { + "type": "git", + "url": "https://github.com/laminas/laminas-zendframework-bridge.git", + "reference": "6ede70583e101030bcace4dcddd648f760ddf642" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/laminas/laminas-zendframework-bridge/zipball/6ede70583e101030bcace4dcddd648f760ddf642", + "reference": "6ede70583e101030bcace4dcddd648f760ddf642", + "shasum": "" + }, + "require": { + "php": "^5.6 || ^7.0 || ^8.0" + }, + "require-dev": { + "phpunit/phpunit": "^5.7 || ^6.5 || ^7.5 || ^8.1 || ^9.3", + "squizlabs/php_codesniffer": "^3.5" + }, + "type": "library", + "extra": { + "laminas": { + "module": "Laminas\\ZendFrameworkBridge" + } + }, + "autoload": { + "files": [ + "src/autoload.php" + ], + "psr-4": { + "Laminas\\ZendFrameworkBridge\\": "src//" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "description": "Alias legacy ZF class names to Laminas Project equivalents.", + "keywords": [ + "ZendFramework", + "autoloading", + "laminas", + "zf" + ], + "support": { + "forum": "https://discourse.laminas.dev/", + "issues": "https://github.com/laminas/laminas-zendframework-bridge/issues", + "rss": "https://github.com/laminas/laminas-zendframework-bridge/releases.atom", + "source": "https://github.com/laminas/laminas-zendframework-bridge" + }, + "funding": [ + { + "url": "https://funding.communitybridge.org/projects/laminas-project", + "type": "community_bridge" + } + ], + "time": "2020-09-14T14:23:00+00:00" + }, { "name": "psr/container", "version": "1.0.0", @@ -59,6 +218,114 @@ }, "time": "2017-02-14T16:28:37+00:00" }, + { + "name": "psr/http-factory", + "version": "1.0.1", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-factory.git", + "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-factory/zipball/12ac7fcd07e5b077433f5f2bee95b3a771bf61be", + "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be", + "shasum": "" + }, + "require": { + "php": ">=7.0.0", + "psr/http-message": "^1.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Message\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interfaces for PSR-7 HTTP message factories", + "keywords": [ + "factory", + "http", + "message", + "psr", + "psr-17", + "psr-7", + "request", + "response" + ], + "support": { + "source": "https://github.com/php-fig/http-factory/tree/master" + }, + "time": "2019-04-30T12:38:16+00:00" + }, + { + "name": "psr/http-message", + "version": "1.0.1", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-message.git", + "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-message/zipball/f6561bf28d520154e4b0ec72be95418abe6d9363", + "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363", + "shasum": "" + }, + "require": { + "php": ">=5.3.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Message\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interface for HTTP messages", + "homepage": "https://github.com/php-fig/http-message", + "keywords": [ + "http", + "http-message", + "psr", + "psr-7", + "request", + "response" + ], + "support": { + "source": "https://github.com/php-fig/http-message/tree/master" + }, + "time": "2016-08-06T14:39:51+00:00" + }, { "name": "spiral/goridge", "version": "v2.4.5", @@ -851,16 +1118,16 @@ "packages-dev": [ { "name": "phpstan/phpstan", - "version": "0.12.53", + "version": "0.12.56", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "dbbdb0d7c2434ecd5289f6114d16473e694caa67" + "reference": "007fd5d700c41e1bb27795fae15a2383f8fa4ba1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/dbbdb0d7c2434ecd5289f6114d16473e694caa67", - "reference": "dbbdb0d7c2434ecd5289f6114d16473e694caa67", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/007fd5d700c41e1bb27795fae15a2383f8fa4ba1", + "reference": "007fd5d700c41e1bb27795fae15a2383f8fa4ba1", "shasum": "" }, "require": { @@ -891,7 +1158,7 @@ "description": "PHPStan - PHP Static Analysis Tool", "support": { "issues": "https://github.com/phpstan/phpstan/issues", - "source": "https://github.com/phpstan/phpstan/tree/0.12.53" + "source": "https://github.com/phpstan/phpstan/tree/0.12.56" }, "funding": [ { @@ -907,115 +1174,7 @@ "type": "tidelift" } ], - "time": "2020-11-01T14:51:50+00:00" - }, - { - "name": "psr/http-factory", - "version": "1.0.1", - "source": { - "type": "git", - "url": "https://github.com/php-fig/http-factory.git", - "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-fig/http-factory/zipball/12ac7fcd07e5b077433f5f2bee95b3a771bf61be", - "reference": "12ac7fcd07e5b077433f5f2bee95b3a771bf61be", - "shasum": "" - }, - "require": { - "php": ">=7.0.0", - "psr/http-message": "^1.0" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "1.0.x-dev" - } - }, - "autoload": { - "psr-4": { - "Psr\\Http\\Message\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "PHP-FIG", - "homepage": "http://www.php-fig.org/" - } - ], - "description": "Common interfaces for PSR-7 HTTP message factories", - "keywords": [ - "factory", - "http", - "message", - "psr", - "psr-17", - "psr-7", - "request", - "response" - ], - "support": { - "source": "https://github.com/php-fig/http-factory/tree/master" - }, - "time": "2019-04-30T12:38:16+00:00" - }, - { - "name": "psr/http-message", - "version": "1.0.1", - "source": { - "type": "git", - "url": "https://github.com/php-fig/http-message.git", - "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-fig/http-message/zipball/f6561bf28d520154e4b0ec72be95418abe6d9363", - "reference": "f6561bf28d520154e4b0ec72be95418abe6d9363", - "shasum": "" - }, - "require": { - "php": ">=5.3.0" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "1.0.x-dev" - } - }, - "autoload": { - "psr-4": { - "Psr\\Http\\Message\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "PHP-FIG", - "homepage": "http://www.php-fig.org/" - } - ], - "description": "Common interface for HTTP messages", - "homepage": "https://github.com/php-fig/http-message", - "keywords": [ - "http", - "http-message", - "psr", - "psr-7", - "request", - "response" - ], - "support": { - "source": "https://github.com/php-fig/http-message/tree/master" - }, - "time": "2016-08-06T14:39:51+00:00" + "time": "2020-11-16T22:59:18+00:00" } ], "aliases": [], diff --git a/go.mod b/go.mod index 21500c30..0cbbfc70 100755 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( github.com/fatih/color v1.10.0 + github.com/hashicorp/go-multierror v1.0.0 github.com/json-iterator/go v1.1.10 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.7.1 @@ -16,6 +17,7 @@ require ( github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a go.uber.org/multierr v1.6.0 go.uber.org/zap v1.16.0 + golang.org/x/net v0.0.0-20200222125558-5a598a2470a0 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 -) \ No newline at end of file +) diff --git a/go.sum b/go.sum index 343f4fcf..dcec5726 100755 --- a/go.sum +++ b/go.sum @@ -115,10 +115,12 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= @@ -341,7 +343,9 @@ golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200222125558-5a598a2470a0 h1:MsuvTghUPjX762sGLnGsxC3HM0B5r83wEtYcYR8/vRs= golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/interfaces/app/interface.go b/interfaces/app/interface.go deleted file mode 100644 index 4db7a094..00000000 --- a/interfaces/app/interface.go +++ /dev/null @@ -1,15 +0,0 @@ -package app - -import ( - "context" - "os/exec" - - "github.com/spiral/roadrunner/v2" -) - -// WorkerFactory creates workers for the application. -type WorkerFactory interface { - CmdFactory(env map[string]string) (func() *exec.Cmd, error) - NewWorker(ctx context.Context, env map[string]string) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env map[string]string) (roadrunner.Pool, error) -} diff --git a/plugins/http/config.go b/plugins/http/config.go index b3d16b62..7922f485 100644 --- a/plugins/http/config.go +++ b/plugins/http/config.go @@ -11,9 +11,6 @@ import ( "github.com/spiral/roadrunner/v2" ) -type PoolConfig struct { -} - type ServerConfig struct { // Command includes command strings with all the parameters, example: "php worker.php pipes". Command string @@ -32,7 +29,7 @@ type ServerConfig struct { // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change // while server is running. - PoolCfg *roadrunner.PoolConfig + env map[string]string } @@ -61,8 +58,8 @@ type Config struct { // Uploads configures uploads configuration. Uploads *UploadsConfig - // Workers configures rr server and worker pool. - Workers *ServerConfig + // Pool configures worker pool. + Pool *roadrunner.PoolConfig } // FCGIConfig for FastCGI server. @@ -135,10 +132,10 @@ func (c *Config) EnableFCGI() bool { } // Hydrate must populate Config values using given Config source. Must return error if Config is not valid. -func (c *Config) Hydrate(cfg service.Config) error { - if c.Workers == nil { - c.Workers = &roadrunner.ServerConfig{} - } +func (c *Config) Hydrate(cfg Config) error { + //if c.Workers == nil { + // c.Workers = &ServerConfig{} + //} if c.HTTP2 == nil { c.HTTP2 = &HTTP2Config{} @@ -164,16 +161,16 @@ func (c *Config) Hydrate(cfg service.Config) error { if err != nil { return err } - err = c.Workers.InitDefaults() - if err != nil { - return err - } - - if err := cfg.Unmarshal(c); err != nil { - return err - } - - c.Workers.UpscaleDurations() + //err = c.Workers.InitDefaults() + //if err != nil { + // return err + //} + // + //if err := cfg.Unmarshal(c); err != nil { + // return err + //} + // + //c.Workers.UpscaleDurations() if c.TrustedSubnets == nil { // @see https://en.wikipedia.org/wiki/Reserved_IP_addresses @@ -238,17 +235,17 @@ func (c *Config) Valid() error { return errors.New("malformed http2 config") } - if c.Workers == nil { - return errors.New("malformed workers config") - } - - if c.Workers.Pool == nil { - return errors.New("malformed workers config (pool config is missing)") - } - - if err := c.Workers.Pool.Valid(); err != nil { - return err - } + //if c.Workers == nil { + // return errors.New("malformed workers config") + //} + // + //if c.Workers.Pool == nil { + // return errors.New("malformed workers config (pool config is missing)") + //} + + //if err := c.Workers.Pool.Valid(); err != nil { + // return err + //} if !c.EnableHTTP() && !c.EnableTLS() && !c.EnableFCGI() { return errors.New("unable to run http service, no method has been specified (http, https, http/2 or FastCGI)") diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 24eaa68c..94b6c74b 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -12,11 +12,13 @@ import ( "strings" "sync" + "github.com/hashicorp/go-multierror" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2" - factory "github.com/spiral/roadrunner/v2/interfaces/app" "github.com/spiral/roadrunner/v2/interfaces/log" + factory "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/util" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "golang.org/x/sys/cpu" @@ -79,7 +81,7 @@ func (s *Plugin) AddListener(l func(event int, ctx interface{})) { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerFactory) error { +func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.WorkerFactory) error { const op = errors.Op("http Init") err := cfg.UnmarshalKey(ServiceName, &s.cfg) if err != nil { @@ -88,14 +90,18 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerF s.log = log - p, err := app.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ - Debug: s.cfg.Workers.PoolCfg.Debug, - NumWorkers: s.cfg.Workers.PoolCfg.NumWorkers, - MaxJobs: s.cfg.Workers.PoolCfg.MaxJobs, - AllocateTimeout: s.cfg.Workers.PoolCfg.AllocateTimeout, - DestroyTimeout: s.cfg.Workers.PoolCfg.DestroyTimeout, + // Set needed env vars + env := make(map[string]string) + env["RR_HTTP"] = "true" + + p, err := server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + Debug: s.cfg.Pool.Debug, + NumWorkers: s.cfg.Pool.NumWorkers, + MaxJobs: s.cfg.Pool.MaxJobs, + AllocateTimeout: s.cfg.Pool.AllocateTimeout, + DestroyTimeout: s.cfg.Pool.DestroyTimeout, Supervisor: nil, - }, nil) + }, env) if err != nil { return errors.E(op, err) @@ -117,27 +123,29 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, app factory.WorkerF } // Serve serves the svc. -func (s *Plugin) Serve() error { +func (s *Plugin) Serve() chan error { s.Lock() + const op = errors.Op("serve http") + errCh := make(chan error, 2) - if s.env != nil { - if err := s.env.Copy(s.cfg.Workers); err != nil { - return nil - } - } - - s.cfg.Workers.CommandProducer = s.cprod - s.cfg.Workers.SetEnv("RR_HTTP", "true") - - s.rr = roadrunner.NewServer(s.cfg.Workers) - s.rr.Listen(s.throw) - - if s.controller != nil { - s.rr.Attach(s.controller) - } + //if s.env != nil { + // if err := s.env.Copy(s.cfg.Workers); err != nil { + // return nil + // } + //} + // + //s.cfg.Workers.CommandProducer = s.cprod + //s.cfg.Workers.SetEnv("RR_HTTP", "true") + // + //s.rr = roadrunner.NewServer(s.cfg.Workers) + //s.rr.Listen(s.throw) + // + //if s.controller != nil { + // s.rr.Attach(s.controller) + //} s.handler = &Handler{cfg: s.cfg, rr: s.rr} - s.handler.Listen(s.throw) + //s.handler.Listen(s.throw) if s.cfg.EnableHTTP() { if s.cfg.EnableH2C() { @@ -152,13 +160,15 @@ func (s *Plugin) Serve() error { if s.cfg.SSL.RootCA != "" { err := s.appendRootCa() if err != nil { - return err + errCh <- errors.E(op, err) + return errCh } } if s.cfg.EnableHTTP2() { if err := s.initHTTP2(); err != nil { - return err + errCh <- errors.E(op, err) + return errCh } } } @@ -169,21 +179,19 @@ func (s *Plugin) Serve() error { s.Unlock() - if err := s.rr.Start(); err != nil { - return err - } - defer s.rr.Stop() - - err := make(chan error, 3) + //if err := s.rr.Start(); err != nil { + // return err + //} + //defer s.rr.Stop() if s.http != nil { go func() { httpErr := s.http.ListenAndServe() if httpErr != nil && httpErr != http.ErrServerClosed { - err <- httpErr - } else { - err <- nil + errCh <- errors.E(op, httpErr) + return } + return }() } @@ -195,10 +203,10 @@ func (s *Plugin) Serve() error { ) if httpErr != nil && httpErr != http.ErrServerClosed { - err <- httpErr + errCh <- errors.E(op, httpErr) return } - err <- nil + return }() } @@ -206,72 +214,54 @@ func (s *Plugin) Serve() error { go func() { httpErr := s.serveFCGI() if httpErr != nil && httpErr != http.ErrServerClosed { - err <- httpErr + errCh <- errors.E(op, httpErr) return } - err <- nil + return }() } - return <-err + return errCh } // Stop stops the http. -func (s *Plugin) Stop() { +func (s *Plugin) Stop() error { s.Lock() defer s.Unlock() + var err error if s.fcgi != nil { - s.Add(1) - go func() { - defer s.Done() - err := s.fcgi.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - // Stop() error - // push error from goroutines to the channel and block unil error or success shutdown or timeout - s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err)) - return - } - }() + err = s.fcgi.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the fcgi server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } } if s.https != nil { - s.Add(1) - go func() { - defer s.Done() - err := s.https.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err)) - return - } - }() + err = s.https.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the https server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } } if s.http != nil { - s.Add(1) - go func() { - defer s.Done() - err := s.http.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err)) - return - } - }() + err = s.http.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error("error shutting down the http server", "error", err) + // write error and try to stop other transport + err = multierror.Append(err) + } } - s.Wait() -} - -// Server returns associated rr server (if any). -func (s *Plugin) Server() *roadrunner.Server { - s.Lock() - defer s.Unlock() - - return s.rr + return err } // ServeHTTP handles connection using set of middleware and rr PSR-7 server. -func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect { target := &url.URL{ Scheme: "https", @@ -288,7 +278,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload") } - r = attributes.Init(r) + //r = attributes.Init(r) // chaining middleware f := s.handler.ServeHTTP @@ -300,9 +290,10 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { // append RootCA to the https server TLS config func (s *Plugin) appendRootCa() error { + const op = errors.Op("append root CA") rootCAs, err := x509.SystemCertPool() if err != nil { - s.throw(EventInitSSL, nil) + //s.throw(EventInitSSL, nil) return nil } if rootCAs == nil { @@ -311,20 +302,20 @@ func (s *Plugin) appendRootCa() error { CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA) if err != nil { - s.throw(EventInitSSL, nil) + //s.throw(EventInitSSL, nil) return err } // should append our CA cert ok := rootCAs.AppendCertsFromPEM(CA) if !ok { - return couldNotAppendPemError + return errors.E(op, errors.Str("could not append Certs from PEM")) } - config := &tls.Config{ + cfg := &tls.Config{ InsecureSkipVerify: false, RootCAs: rootCAs, } - s.http.TLSConfig = config + s.http.TLSConfig = cfg return nil } @@ -394,7 +385,7 @@ func (s *Plugin) initSSL() *http.Server { PreferServerCipherSuites: true, }, } - s.throw(EventInitSSL, server) + //s.throw(EventInitSSL, server) return server } @@ -422,16 +413,16 @@ func (s *Plugin) serveFCGI() error { } // throw handles service, server and pool events. -func (s *Plugin) throw(event int, ctx interface{}) { - for _, l := range s.lsns { - l(event, ctx) - } - - if event == roadrunner.EventServerFailure { - // underlying rr server is dead - s.Stop() - } -} +//func (s *Plugin) throw(event int, ctx interface{}) { +// for _, l := range s.lsns { +// l(event, ctx) +// } +// +// if event == roadrunner.EventServerFailure { +// // underlying rr server is dead +// s.Stop() +// } +//} // tlsAddr replaces listen or host port with port configured by SSL config. func (s *Plugin) tlsAddr(host string, forcePort bool) string { @@ -444,3 +435,66 @@ func (s *Plugin) tlsAddr(host string, forcePort bool) string { return host } + +// Server returns associated rr workers +func (s *Plugin) Workers() []roadrunner.WorkerBase { + return s.rr.Workers() +} + +func (s *Plugin) Reset() error { + // re-read the config + // destroy the pool + // attach new one + + //s.mup.Lock() + //defer s.mup.Unlock() + // + //s.mu.Lock() + //if !s.started { + // s.cfg = cfg + // s.mu.Unlock() + // return nil + //} + //s.mu.Unlock() + // + //if s.cfg.Differs(cfg) { + // return errors.New("unable to reconfigure server (cmd and pool changes are allowed)") + //} + // + //s.mu.Lock() + //previous := s.pool + //pWatcher := s.pController + //s.mu.Unlock() + // + //pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) + //if err != nil { + // return err + //} + // + //pool.Listen(s.poolListener) + // + //s.mu.Lock() + //s.cfg.Pool, s.pool = cfg.Pool, pool + // + //if s.controller != nil { + // s.pController = s.controller.Attach(pool) + //} + // + //s.mu.Unlock() + // + //s.throw(EventPoolConstruct, pool) + // + //if previous != nil { + // go func(previous Pool, pWatcher Controller) { + // s.throw(EventPoolDestruct, previous) + // if pWatcher != nil { + // pWatcher.Detach() + // } + // + // previous.Destroy() + // }(previous, pWatcher) + //} + // + //return nil + return nil +} diff --git a/plugins/http/response.go b/plugins/http/response.go index 88848b9d..c3de434f 100644 --- a/plugins/http/response.go +++ b/plugins/http/response.go @@ -6,9 +6,9 @@ import ( "strings" json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2" ) - // Response handles PSR7 response logic. type Response struct { // Status contains response status. @@ -22,7 +22,7 @@ type Response struct { } // NewResponse creates new response based on given rr payload. -func NewResponse(p *roadrunner.Payload) (*Response, error) { +func NewResponse(p roadrunner.Payload) (*Response, error) { r := &Response{body: p.Body} j := json.ConfigCompatibleWithStandardLibrary if err := j.Unmarshal(p.Context, r); err != nil { diff --git a/plugins/http/rpc.go b/plugins/http/rpc.go index 7b38dece..56d8f1a1 100644 --- a/plugins/http/rpc.go +++ b/plugins/http/rpc.go @@ -1,34 +1,34 @@ package http -import ( - "github.com/pkg/errors" - "github.com/spiral/roadrunner/util" -) +//import ( +// "github.com/pkg/errors" +// "github.com/spiral/roadrunner/util" +//) -type rpcServer struct{ svc *Service } - -// WorkerList contains list of workers. -type WorkerList struct { - // Workers is list of workers. - Workers []*util.State `json:"workers"` -} - -// Reset resets underlying RR worker pool and restarts all of it's workers. -func (rpc *rpcServer) Reset(reset bool, r *string) error { - if rpc.svc == nil || rpc.svc.handler == nil { - return errors.New("http server is not running") - } - - *r = "OK" - return rpc.svc.Server().Reset() -} - -// Workers returns list of active workers and their stats. -func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) { - if rpc.svc == nil || rpc.svc.handler == nil { - return errors.New("http server is not running") - } - - r.Workers, err = util.ServerState(rpc.svc.Server()) - return err -} +//type rpcServer struct{ svc *Plugin } +// +//// WorkerList contains list of workers. +//type WorkerList struct { +// // Workers is list of workers. +// Workers []*util.State `json:"workers"` +//} +// +//// Reset resets underlying RR worker pool and restarts all of it's workers. +//func (rpc *rpcServer) Reset(reset bool, r *string) error { +// if rpc.svc == nil || rpc.svc.handler == nil { +// return errors.New("http server is not running") +// } +// +// *r = "OK" +// return rpc.svc.Server().Reset() +//} +// +//// Workers returns list of active workers and their stats. +//func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) { +// if rpc.svc == nil || rpc.svc.handler == nil { +// return errors.New("http server is not running") +// } +// +// r.Workers, err = util.ServerState(rpc.svc.Server()) +// return err +//} diff --git a/plugins/http/test/.rr-http.yaml b/plugins/http/test/.rr-http.yaml new file mode 100644 index 00000000..6fbfd378 --- /dev/null +++ b/plugins/http/test/.rr-http.yaml @@ -0,0 +1,37 @@ +server: + command: "php psr-worker.php" + user: "" + group: "" + env: + "RR_HTTP": "true" + relay: "pipes" + relayTimeout: "20s" + +http: + debug: true + address: 0.0.0.0:8080 + maxRequestSize: 200 + middleware: [ "" ] + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + numWorkers: 4 + maxJobs: 0 + allocateTimeout: 60s + destroyTimeout: 60s + + # ssl: + # port: 443 + # redirect: true + # cert: server.crt + # key: server.key + # rootCa: root.crt + fcgi: + address: tcp://0.0.0.0:6920 + http2: + enabled: false + h2c: false + maxConcurrentStreams: 128 + + diff --git a/plugins/http/test/http_test.go b/plugins/http/test/http_test.go index c109d930..07925d33 100644 --- a/plugins/http/test/http_test.go +++ b/plugins/http/test/http_test.go @@ -1,10 +1,18 @@ package test import ( + "os" + "os/signal" + "syscall" "testing" + "time" "github.com/spiral/endure" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/logger" + //rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" "github.com/stretchr/testify/assert" ) @@ -14,10 +22,51 @@ func TestHTTPInit(t *testing.T) { cfg := &config.Viper{ Path: ".rr-http.yaml", - Prefix: "", + Prefix: "rr", } + err = cont.RegisterAll( + cfg, + //&rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &http.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } - ) + ch, err := cont.Serve() assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Minute * 3) + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } } diff --git a/plugins/http/test/psr-worker.php b/plugins/http/test/psr-worker.php new file mode 100644 index 00000000..65fc6bde --- /dev/null +++ b/plugins/http/test/psr-worker.php @@ -0,0 +1,23 @@ +acceptRequest()) { + try { + $resp = new \Zend\Diactoros\Response(); + $resp->getBody()->write("hello world"); + + $psr7->respond($resp); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +} \ No newline at end of file diff --git a/plugins/http/test/rr-http.yaml b/plugins/http/test/rr-http.yaml deleted file mode 100644 index 8a04a1f1..00000000 --- a/plugins/http/test/rr-http.yaml +++ /dev/null @@ -1,34 +0,0 @@ -http: - address: 0.0.0.0:8080 - maxRequestSize: 200 - middlewares: [ "" ] - uploads: - forbid: [ ".php", ".exe", ".bat" ] - trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] - workers: - command: "php psr-worker.php pipes" - user: "" - - # connection method (pipes, tcp://:9000, unix://socket.unix). default "pipes" - relay: "pipes" - - pool: - numWorkers: 4 - maxJobs: 0 - allocateTimeout: 60 - destroyTimeout: 60 - - ssl: - port: 443 - redirect: true - cert: server.crt - key: server.key - rootCa: root.crt - fcgi: - address: tcp://0.0.0.0:6920 - http2: - enabled: true - h2c: true - maxConcurrentStreams: 128 - - diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index e096708a..4d606390 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -25,49 +25,47 @@ type Plugin struct { } // Init application provider. -func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error { +func (server *Plugin) Init(cfg config.Configurer, log log.Logger) error { const op = errors.Op("Init") - err := cfg.UnmarshalKey(ServiceName, &app.cfg) + err := cfg.UnmarshalKey(ServiceName, &server.cfg) if err != nil { return errors.E(op, errors.Init, err) } - app.cfg.InitDefaults() - app.log = log + server.cfg.InitDefaults() + server.log = log + + server.factory, err = server.initFactory() + if err != nil { + return errors.E(errors.Op("Init factory"), err) + } return nil } // Name contains service name. -func (app *Plugin) Name() string { +func (server *Plugin) Name() string { return ServiceName } -func (app *Plugin) Serve() chan error { +func (server *Plugin) Serve() chan error { errCh := make(chan error, 1) - var err error - - app.factory, err = app.initFactory() - if err != nil { - errCh <- errors.E(errors.Op("init factory"), err) - } - return errCh } -func (app *Plugin) Stop() error { - if app.factory == nil { +func (server *Plugin) Stop() error { + if server.factory == nil { return nil } - return app.factory.Close(context.Background()) + return server.factory.Close(context.Background()) } // CmdFactory provides worker command factory assocated with given context. -func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { +func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { var cmdArgs []string // create command according to the config - cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) + cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...) return func() *exec.Cmd { cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) @@ -75,67 +73,67 @@ func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { // if user is not empty, and OS is linux or macos // execute php worker from that particular user - if app.cfg.User != "" { - err := util.ExecuteFromUser(cmd, app.cfg.User) + if server.cfg.User != "" { + err := util.ExecuteFromUser(cmd, server.cfg.User) if err != nil { return nil } } - cmd.Env = app.setEnv(env) + cmd.Env = server.setEnv(env) return cmd }, nil } // NewWorker issues new standalone worker. -func (app *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { +func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { const op = errors.Op("new worker") - spawnCmd, err := app.CmdFactory(env) + spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, errors.E(op, err) } - w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) + w, err := server.factory.SpawnWorkerWithContext(ctx, spawnCmd()) if err != nil { return nil, errors.E(op, err) } - w.AddListener(app.collectLogs) + w.AddListener(server.collectLogs) return w, nil } // NewWorkerPool issues new worker pool. -func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { - spawnCmd, err := app.CmdFactory(env) +func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { + spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, err } - p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt) if err != nil { return nil, err } - p.AddListener(app.collectLogs) + p.AddListener(server.collectLogs) return p, nil } // creates relay and worker factory. -func (app *Plugin) initFactory() (roadrunner.Factory, error) { +func (server *Plugin) initFactory() (roadrunner.Factory, error) { const op = errors.Op("network factory init") - if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { + if server.cfg.Relay == "" || server.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } - dsn := strings.Split(app.cfg.Relay, "://") + dsn := strings.Split(server.cfg.Relay, "://") if len(dsn) != 2 { return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } - lsn, err := util.CreateListener(app.cfg.Relay) + lsn, err := util.CreateListener(server.cfg.Relay) if err != nil { return nil, errors.E(op, errors.Network, err) } @@ -143,16 +141,16 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) { switch dsn[0] { // sockets group case "unix": - return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil case "tcp": - return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil default: return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } -func (app *Plugin) setEnv(e server.Env) []string { - env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) +func (server *Plugin) setEnv(e server.Env) []string { + env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay)) for k, v := range e { env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) } @@ -160,13 +158,13 @@ func (app *Plugin) setEnv(e server.Env) []string { return env } -func (app *Plugin) collectLogs(event interface{}) { +func (server *Plugin) collectLogs(event interface{}) { if we, ok := event.(roadrunner.WorkerEvent); ok { switch we.Event { case roadrunner.EventWorkerError: - app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) + server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) case roadrunner.EventWorkerLog: - app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) + server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) } } } diff --git a/src/Diactoros/ServerRequestFactory.php b/src/Diactoros/ServerRequestFactory.php new file mode 100644 index 00000000..3fcf8e29 --- /dev/null +++ b/src/Diactoros/ServerRequestFactory.php @@ -0,0 +1,26 @@ +createStreamFromResource($resource); + } + + /** + * @inheritdoc + */ + public function createStreamFromFile(string $file, string $mode = 'rb'): StreamInterface + { + $resource = fopen($file, $mode); + + if (! \is_resource($resource)) { + throw new RuntimeException('Cannot create stream'); + } + + return $this->createStreamFromResource($resource); + } + + /** + * @inheritdoc + */ + public function createStreamFromResource($resource): StreamInterface + { + return new Stream($resource); + } +} diff --git a/src/Diactoros/UploadedFileFactory.php b/src/Diactoros/UploadedFileFactory.php new file mode 100644 index 00000000..45773287 --- /dev/null +++ b/src/Diactoros/UploadedFileFactory.php @@ -0,0 +1,36 @@ +getSize(); + } + + /** @var resource $stream */ + return new UploadedFile($stream, $size, $error, $clientFilename, $clientMediaType); + } +} diff --git a/src/Exception/MetricException.php b/src/Exception/MetricException.php old mode 100755 new mode 100644 diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php old mode 100755 new mode 100644 index cd657502..f83c3dd4 --- a/src/Exception/RoadRunnerException.php +++ b/src/Exception/RoadRunnerException.php @@ -9,6 +9,6 @@ declare(strict_types=1); namespace Spiral\RoadRunner\Exception; -class RoadRunnerException extends \RuntimeException +class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException { } diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php new file mode 100644 index 00000000..43967893 --- /dev/null +++ b/src/Exceptions/RoadRunnerException.php @@ -0,0 +1,18 @@ +worker = $worker; - } - - /** - * @return Worker - */ - public function getWorker(): Worker - { - return $this->worker; - } - - /** - * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string] - * or null if termination request or invalid context. - */ - public function acceptRequest(): ?array - { - $body = $this->getWorker()->receive($ctx); - if (empty($body) && empty($ctx)) { - // termination request - return null; - } - - $ctx = json_decode($ctx, true); - if ($ctx === null) { - // invalid context - return null; - } - - return ['ctx' => $ctx, 'body' => $body]; - } - - /** - * Send response to the application server. - * - * @param int $status Http status code - * @param string $body Body of response - * @param string[][] $headers An associative array of the message's headers. Each - * key MUST be a header name, and each value MUST be an array of strings - * for that header. - */ - public function respond(int $status, string $body, array $headers = []): void - { - if (empty($headers)) { - // this is required to represent empty header set as map and not as array - $headers = new \stdClass(); - } - - $this->getWorker()->send( - $body, - (string) json_encode(['status' => $status, 'headers' => $headers]) - ); - } -} diff --git a/src/Http/PSR7Client.php b/src/Http/PSR7Client.php deleted file mode 100644 index 777dd891..00000000 --- a/src/Http/PSR7Client.php +++ /dev/null @@ -1,217 +0,0 @@ -httpClient = new HttpClient($worker); - $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory(); - $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory(); - $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory(); - $this->originalServer = $_SERVER; - } - - /** - * @return Worker - */ - public function getWorker(): Worker - { - return $this->httpClient->getWorker(); - } - - /** - * @return ServerRequestInterface|null - */ - public function acceptRequest(): ?ServerRequestInterface - { - $rawRequest = $this->httpClient->acceptRequest(); - if ($rawRequest === null) { - return null; - } - - $_SERVER = $this->configureServer($rawRequest['ctx']); - - $request = $this->requestFactory->createServerRequest( - $rawRequest['ctx']['method'], - $rawRequest['ctx']['uri'], - $_SERVER - ); - - parse_str($rawRequest['ctx']['rawQuery'], $query); - - $request = $request - ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol'])) - ->withCookieParams($rawRequest['ctx']['cookies']) - ->withQueryParams($query) - ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads'])); - - foreach ($rawRequest['ctx']['attributes'] as $name => $value) { - $request = $request->withAttribute($name, $value); - } - - foreach ($rawRequest['ctx']['headers'] as $name => $value) { - $request = $request->withHeader($name, $value); - } - - if ($rawRequest['ctx']['parsed']) { - return $request->withParsedBody(json_decode($rawRequest['body'], true)); - } - - if ($rawRequest['body'] !== null) { - return $request->withBody($this->streamFactory->createStream($rawRequest['body'])); - } - - return $request; - } - - /** - * Send response to the application server. - * - * @param ResponseInterface $response - */ - public function respond(ResponseInterface $response): void - { - $this->httpClient->respond( - $response->getStatusCode(), - $response->getBody()->__toString(), - $response->getHeaders() - ); - } - - /** - * Returns altered copy of _SERVER variable. Sets ip-address, - * request-time and other values. - * - * @param mixed[] $ctx - * @return mixed[] - */ - protected function configureServer(array $ctx): array - { - $server = $this->originalServer; - - $server['REQUEST_URI'] = $ctx['uri']; - $server['REQUEST_TIME'] = time(); - $server['REQUEST_TIME_FLOAT'] = microtime(true); - $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1'; - $server['REQUEST_METHOD'] = $ctx['method']; - - $server['HTTP_USER_AGENT'] = ''; - foreach ($ctx['headers'] as $key => $value) { - $key = strtoupper(str_replace('-', '_', $key)); - if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) { - $server[$key] = implode(', ', $value); - } else { - $server['HTTP_' . $key] = implode(', ', $value); - } - } - - return $server; - } - - /** - * Wraps all uploaded files with UploadedFile. - * - * @param array[] $files - * - * @return UploadedFileInterface[]|mixed[] - */ - private function wrapUploads($files): array - { - if (empty($files)) { - return []; - } - - $result = []; - foreach ($files as $index => $f) { - if (!isset($f['name'])) { - $result[$index] = $this->wrapUploads($f); - continue; - } - - if (UPLOAD_ERR_OK === $f['error']) { - $stream = $this->streamFactory->createStreamFromFile($f['tmpName']); - } else { - $stream = $this->streamFactory->createStream(); - } - - $result[$index] = $this->uploadsFactory->createUploadedFile( - $stream, - $f['size'], - $f['error'], - $f['name'], - $f['mime'] - ); - } - - return $result; - } - - /** - * Normalize HTTP protocol version to valid values - * - * @param string $version - * @return string - */ - private static function fetchProtocolVersion(string $version): string - { - $v = substr($version, 5); - - if ($v === '2.0') { - return '2'; - } - - // Fallback for values outside of valid protocol versions - if (!in_array($v, static::$allowedVersions, true)) { - return '1.1'; - } - - return $v; - } -} diff --git a/src/HttpClient.php b/src/HttpClient.php new file mode 100644 index 00000000..4ca152c8 --- /dev/null +++ b/src/HttpClient.php @@ -0,0 +1,75 @@ +worker = $worker; + } + + /** + * @return Worker + */ + public function getWorker(): Worker + { + return $this->worker; + } + + /** + * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string] + * or null if termination request or invalid context. + */ + public function acceptRequest(): ?array + { + $body = $this->getWorker()->receive($ctx); + if (empty($body) && empty($ctx)) { + // termination request + return null; + } + + $ctx = json_decode($ctx, true); + if ($ctx === null) { + // invalid context + return null; + } + + return ['ctx' => $ctx, 'body' => $body]; + } + + /** + * Send response to the application server. + * + * @param int $status Http status code + * @param string $body Body of response + * @param string[][] $headers An associative array of the message's headers. Each + * key MUST be a header name, and each value MUST be an array of strings + * for that header. + */ + public function respond(int $status, string $body, array $headers = []): void + { + if (empty($headers)) { + // this is required to represent empty header set as map and not as array + $headers = new \stdClass(); + } + + $this->getWorker()->send( + $body, + (string) json_encode(['status' => $status, 'headers' => $headers]) + ); + } +} diff --git a/src/Logger/.empty b/src/Logger/.empty deleted file mode 100644 index e69de29b..00000000 diff --git a/src/Metrics.php b/src/Metrics.php new file mode 100644 index 00000000..d6b6e1da --- /dev/null +++ b/src/Metrics.php @@ -0,0 +1,80 @@ +rpc = $rpc; + } + + /** + * @inheritDoc + */ + public function add(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Add', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function sub(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function observe(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function set(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Set', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } +} diff --git a/src/Metrics/Metrics.php b/src/Metrics/Metrics.php deleted file mode 100644 index d6b6e1da..00000000 --- a/src/Metrics/Metrics.php +++ /dev/null @@ -1,80 +0,0 @@ -rpc = $rpc; - } - - /** - * @inheritDoc - */ - public function add(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Add', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } - - /** - * @inheritDoc - */ - public function sub(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } - - /** - * @inheritDoc - */ - public function observe(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } - - /** - * @inheritDoc - */ - public function set(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Set', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } -} diff --git a/src/Metrics/MetricsInterface.php b/src/Metrics/MetricsInterface.php deleted file mode 100644 index ec2009b0..00000000 --- a/src/Metrics/MetricsInterface.php +++ /dev/null @@ -1,64 +0,0 @@ -httpClient = new HttpClient($worker); + $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory(); + $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory(); + $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory(); + $this->originalServer = $_SERVER; + } + + /** + * @return Worker + */ + public function getWorker(): Worker + { + return $this->httpClient->getWorker(); + } + + /** + * @return ServerRequestInterface|null + */ + public function acceptRequest(): ?ServerRequestInterface + { + $rawRequest = $this->httpClient->acceptRequest(); + if ($rawRequest === null) { + return null; + } + + $_SERVER = $this->configureServer($rawRequest['ctx']); + + $request = $this->requestFactory->createServerRequest( + $rawRequest['ctx']['method'], + $rawRequest['ctx']['uri'], + $_SERVER + ); + + parse_str($rawRequest['ctx']['rawQuery'], $query); + + $request = $request + ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol'])) + ->withCookieParams($rawRequest['ctx']['cookies']) + ->withQueryParams($query) + ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads'])); + + foreach ($rawRequest['ctx']['attributes'] as $name => $value) { + $request = $request->withAttribute($name, $value); + } + + foreach ($rawRequest['ctx']['headers'] as $name => $value) { + $request = $request->withHeader($name, $value); + } + + if ($rawRequest['ctx']['parsed']) { + return $request->withParsedBody(json_decode($rawRequest['body'], true)); + } + + if ($rawRequest['body'] !== null) { + return $request->withBody($this->streamFactory->createStream($rawRequest['body'])); + } + + return $request; + } + + /** + * Send response to the application server. + * + * @param ResponseInterface $response + */ + public function respond(ResponseInterface $response): void + { + $this->httpClient->respond( + $response->getStatusCode(), + $response->getBody()->__toString(), + $response->getHeaders() + ); + } + + /** + * Returns altered copy of _SERVER variable. Sets ip-address, + * request-time and other values. + * + * @param mixed[] $ctx + * @return mixed[] + */ + protected function configureServer(array $ctx): array + { + $server = $this->originalServer; + + $server['REQUEST_URI'] = $ctx['uri']; + $server['REQUEST_TIME'] = time(); + $server['REQUEST_TIME_FLOAT'] = microtime(true); + $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1'; + $server['REQUEST_METHOD'] = $ctx['method']; + + $server['HTTP_USER_AGENT'] = ''; + foreach ($ctx['headers'] as $key => $value) { + $key = strtoupper(str_replace('-', '_', $key)); + if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) { + $server[$key] = implode(', ', $value); + } else { + $server['HTTP_' . $key] = implode(', ', $value); + } + } + + return $server; + } + + /** + * Wraps all uploaded files with UploadedFile. + * + * @param array[] $files + * + * @return UploadedFileInterface[]|mixed[] + */ + private function wrapUploads($files): array + { + if (empty($files)) { + return []; + } + + $result = []; + foreach ($files as $index => $f) { + if (!isset($f['name'])) { + $result[$index] = $this->wrapUploads($f); + continue; + } + + if (UPLOAD_ERR_OK === $f['error']) { + $stream = $this->streamFactory->createStreamFromFile($f['tmpName']); + } else { + $stream = $this->streamFactory->createStream(); + } + + $result[$index] = $this->uploadsFactory->createUploadedFile( + $stream, + $f['size'], + $f['error'], + $f['name'], + $f['mime'] + ); + } + + return $result; + } + + /** + * Normalize HTTP protocol version to valid values + * + * @param string $version + * @return string + */ + private static function fetchProtocolVersion(string $version): string + { + $v = substr($version, 5); + + if ($v === '2.0') { + return '2'; + } + + // Fallback for values outside of valid protocol versions + if (!in_array($v, static::$allowedVersions, true)) { + return '1.1'; + } + + return $v; + } +} diff --git a/src/Worker.php b/src/Worker.php old mode 100755 new mode 100644 diff --git a/src/WorkerInterface.php b/src/WorkerInterface.php deleted file mode 100644 index e69de29b..00000000 diff --git a/static_pool.go b/static_pool.go index 4e49b212..d5511018 100755 --- a/static_pool.go +++ b/static_pool.go @@ -54,6 +54,9 @@ type StaticPool struct { // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg PoolConfig, options ...PoolOptions) (Pool, error) { const op = errors.Op("NewPool") + if factory == nil { + return nil, errors.E(op, errors.Str("no factory initialized")) + } cfg.InitDefaults() if cfg.Debug { -- cgit v1.2.3