diff options
-rw-r--r-- | LICENSE | 21 | ||||
-rw-r--r-- | README.md | 20 | ||||
-rw-r--r-- | balancer.go | 63 | ||||
-rw-r--r-- | commands.go | 27 | ||||
-rw-r--r-- | composer.json | 29 | ||||
-rw-r--r-- | composer.lock | 1518 | ||||
-rw-r--r-- | config.go | 19 | ||||
-rw-r--r-- | error.go | 17 | ||||
-rw-r--r-- | error_test.go | 16 | ||||
-rw-r--r-- | factory.go | 12 | ||||
-rw-r--r-- | pipe_factory.go | 46 | ||||
-rw-r--r-- | pool.go | 189 | ||||
-rw-r--r-- | socket_factory.go | 138 | ||||
-rw-r--r-- | source/Exceptions/RoadRunnerException.php | 13 | ||||
-rw-r--r-- | source/Worker.php | 162 | ||||
-rw-r--r-- | state.go | 39 | ||||
-rw-r--r-- | tests/broken-client.php | 17 | ||||
-rw-r--r-- | tests/echo-client.php | 20 | ||||
-rw-r--r-- | tests/error-client.php | 16 | ||||
-rw-r--r-- | worker.go | 168 | ||||
-rw-r--r-- | worker_test.go | 130 |
21 files changed, 2680 insertions, 0 deletions
diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..d78565f0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 SpiralScout + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.
\ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 00000000..05d8cd3a --- /dev/null +++ b/README.md @@ -0,0 +1,20 @@ +RoadRunner +========== +Embeddable PHP application server library for Golang. + +Features: +-------- +- load balancer, process manager and task pipeline in one library +- worker pool hot-wrap +- build for multiple frontends (queue, rest, psr-7, async php, etc) +- works over TPC, unix sockets and standard pipes +- controlled worker termination +- timeout management +- payload context +- protocol, job and worker level error management +- very fast (~200k calls per second on Ryzen 1700X over 17 threads) +- works on Windows + +License: +-------- +The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information.
\ No newline at end of file diff --git a/balancer.go b/balancer.go new file mode 100644 index 00000000..29fcd8d5 --- /dev/null +++ b/balancer.go @@ -0,0 +1,63 @@ +package roadrunner + +import ( + "os/exec" + "sync" +) + +// Balancer provides ability to perform hot-swap between 2 worker pools. +type Balancer struct { + mu sync.Mutex // protects pool hot swapping + pool *Pool // pool to work for user commands +} + +// Spawn initiates underlying pool of workers and replaced old one. +func (b *Balancer) Spawn(cmd func() *exec.Cmd, factory Factory, cfg Config) error { + b.mu.Lock() + defer b.mu.Unlock() + + var ( + err error + old *Pool + ) + + old = b.pool + if b.pool, err = NewPool(cmd, factory, cfg); err != nil { + return err + } + + if old != nil { + go func() { + old.Close() + }() + } + + return nil +} + +// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is +// being destroyed. +func (b *Balancer) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { + b.mu.Lock() + pool := b.pool + b.mu.Unlock() + + return pool.Execute(payload, ctx) +} + +// Workers return list of active workers. +func (b *Balancer) Workers() []*Worker { + b.mu.Lock() + defer b.mu.Unlock() + + return b.pool.Workers() +} + +// Close closes underlying pool. +func (b *Balancer) Close() { + b.mu.Lock() + defer b.mu.Unlock() + + b.pool.Close() + b.pool = nil +} diff --git a/commands.go b/commands.go new file mode 100644 index 00000000..600af16a --- /dev/null +++ b/commands.go @@ -0,0 +1,27 @@ +package roadrunner + +import ( + "encoding/json" + "github.com/spiral/goridge" +) + +// TerminateCommand must stop underlying process. +type TerminateCommand struct { + Terminate bool `json:"terminate"` +} + +// PidCommand send greeting message between processes in json format. +type PidCommand struct { + Pid int `json:"pid"` + Parent int `json:"parent,omitempty"` +} + +// sends control message via relay using JSON encoding +func sendCommand(rl goridge.Relay, command interface{}) error { + bin, err := json.Marshal(command) + if err != nil { + return err + } + + return rl.Send(bin, goridge.PayloadControl) +}
\ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 00000000..d795f9ae --- /dev/null +++ b/composer.json @@ -0,0 +1,29 @@ +{ + "name": "spiral/roadrunner", + "type": "goridge", + "description": "High-performance PHP process supervisor and load balancer written in Go", + "license": "MIT", + "authors": [ + { + "name": "Anton Titov / Wolfy-J", + "email": "[email protected]" + } + ], + "require": { + "php": "^7.0", + "spiral/goridge": "^2.0" + }, + "require-dev": { + "phpunit/phpunit": "~6.0" + }, + "autoload": { + "psr-4": { + "Spiral\\RoadRunner\\": "source/" + } + }, + "autoload-dev": { + "psr-4": { + "Spiral\\Tests\\": "tests/Cases/" + } + } +}
\ No newline at end of file diff --git a/composer.lock b/composer.lock new file mode 100644 index 00000000..7646e668 --- /dev/null +++ b/composer.lock @@ -0,0 +1,1518 @@ +{ + "_readme": [ + "This file locks the dependencies of your project to a known state", + "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", + "This file is @generated automatically" + ], + "hash": "a05da84ea1e9e80a52fc218bb4d952c1", + "content-hash": "2deb68e3347a18c68289e209971b087d", + "packages": [ + { + "name": "spiral/goridge", + "version": "v2.0.0", + "source": { + "type": "git", + "url": "https://github.com/spiral/goridge.git", + "reference": "f60182bef09f1e45a47908e1f0fb080affdcab81" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/spiral/goridge/zipball/f60182bef09f1e45a47908e1f0fb080affdcab81", + "reference": "f60182bef09f1e45a47908e1f0fb080affdcab81", + "shasum": "" + }, + "require": { + "php": ">=7.0" + }, + "require-dev": { + "phpunit/phpunit": "~6.0" + }, + "type": "goridge", + "autoload": { + "psr-4": { + "Spiral\\Goridge\\": "source/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Anton Titov / Wolfy-J", + "email": "[email protected]" + } + ], + "description": "High-performance PHP-to-Golang RPC bridge", + "time": "2017-11-17 11:07:26" + } + ], + "packages-dev": [ + { + "name": "doctrine/instantiator", + "version": "1.1.0", + "source": { + "type": "git", + "url": "https://github.com/doctrine/instantiator.git", + "reference": "185b8868aa9bf7159f5f953ed5afb2d7fcdc3bda" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/doctrine/instantiator/zipball/185b8868aa9bf7159f5f953ed5afb2d7fcdc3bda", + "reference": "185b8868aa9bf7159f5f953ed5afb2d7fcdc3bda", + "shasum": "" + }, + "require": { + "php": "^7.1" + }, + "require-dev": { + "athletic/athletic": "~0.1.8", + "ext-pdo": "*", + "ext-phar": "*", + "phpunit/phpunit": "^6.2.3", + "squizlabs/php_codesniffer": "^3.0.2" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.2.x-dev" + } + }, + "autoload": { + "psr-4": { + "Doctrine\\Instantiator\\": "src/Doctrine/Instantiator/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Marco Pivetta", + "email": "[email protected]", + "homepage": "http://ocramius.github.com/" + } + ], + "description": "A small, lightweight utility to instantiate objects in PHP without invoking their constructors", + "homepage": "https://github.com/doctrine/instantiator", + "keywords": [ + "constructor", + "instantiate" + ], + "time": "2017-07-22 11:58:36" + }, + { + "name": "myclabs/deep-copy", + "version": "1.7.0", + "source": { + "type": "git", + "url": "https://github.com/myclabs/DeepCopy.git", + "reference": "3b8a3a99ba1f6a3952ac2747d989303cbd6b7a3e" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/3b8a3a99ba1f6a3952ac2747d989303cbd6b7a3e", + "reference": "3b8a3a99ba1f6a3952ac2747d989303cbd6b7a3e", + "shasum": "" + }, + "require": { + "php": "^5.6 || ^7.0" + }, + "require-dev": { + "doctrine/collections": "^1.0", + "doctrine/common": "^2.6", + "phpunit/phpunit": "^4.1" + }, + "type": "library", + "autoload": { + "psr-4": { + "DeepCopy\\": "src/DeepCopy/" + }, + "files": [ + "src/DeepCopy/deep_copy.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Create deep copies (clones) of your objects", + "keywords": [ + "clone", + "copy", + "duplicate", + "object", + "object graph" + ], + "time": "2017-10-19 19:58:43" + }, + { + "name": "phar-io/manifest", + "version": "1.0.1", + "source": { + "type": "git", + "url": "https://github.com/phar-io/manifest.git", + "reference": "2df402786ab5368a0169091f61a7c1e0eb6852d0" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/phar-io/manifest/zipball/2df402786ab5368a0169091f61a7c1e0eb6852d0", + "reference": "2df402786ab5368a0169091f61a7c1e0eb6852d0", + "shasum": "" + }, + "require": { + "ext-dom": "*", + "ext-phar": "*", + "phar-io/version": "^1.0.1", + "php": "^5.6 || ^7.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Arne Blankerts", + "email": "[email protected]", + "role": "Developer" + }, + { + "name": "Sebastian Heuer", + "email": "[email protected]", + "role": "Developer" + }, + { + "name": "Sebastian Bergmann", + "email": "[email protected]", + "role": "Developer" + } + ], + "description": "Component for reading phar.io manifest information from a PHP Archive (PHAR)", + "time": "2017-03-05 18:14:27" + }, + { + "name": "phar-io/version", + "version": "1.0.1", + "source": { + "type": "git", + "url": "https://github.com/phar-io/version.git", + "reference": "a70c0ced4be299a63d32fa96d9281d03e94041df" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/phar-io/version/zipball/a70c0ced4be299a63d32fa96d9281d03e94041df", + "reference": "a70c0ced4be299a63d32fa96d9281d03e94041df", + "shasum": "" + }, + "require": { + "php": "^5.6 || ^7.0" + }, + "type": "library", + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Arne Blankerts", + "email": "[email protected]", + "role": "Developer" + }, + { + "name": "Sebastian Heuer", + "email": "[email protected]", + "role": "Developer" + }, + { + "name": "Sebastian Bergmann", + "email": "[email protected]", + "role": "Developer" + } + ], + "description": "Library for handling version information and constraints", + "time": "2017-03-05 17:38:23" + }, + { + "name": "phpdocumentor/reflection-common", + "version": "1.0.1", + "source": { + "type": "git", + "url": "https://github.com/phpDocumentor/ReflectionCommon.git", + "reference": "21bdeb5f65d7ebf9f43b1b25d404f87deab5bfb6" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/phpDocumentor/ReflectionCommon/zipball/21bdeb5f65d7ebf9f43b1b25d404f87deab5bfb6", + "reference": "21bdeb5f65d7ebf9f43b1b25d404f87deab5bfb6", + "shasum": "" + }, + "require": { + "php": ">=5.5" + }, + "require-dev": { + "phpunit/phpunit": "^4.6" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "phpDocumentor\\Reflection\\": [ + "src" + ] + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Jaap van Otterdijk", + "email": "[email protected]" + } + ], + "description": "Common reflection classes used by phpdocumentor to reflect the code structure", + "homepage": "http://www.phpdoc.org", + "keywords": [ + "FQSEN", + "phpDocumentor", + "phpdoc", + "reflection", + "static analysis" + ], + "time": "2017-09-11 18:02:19" + }, + { + "name": "phpdocumentor/reflection-docblock", + "version": "4.2.0", + "source": { + "type": "git", + "url": "https://github.com/phpDocumentor/ReflectionDocBlock.git", + "reference": "66465776cfc249844bde6d117abff1d22e06c2da" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/phpDocumentor/ReflectionDocBlock/zipball/66465776cfc249844bde6d117abff1d22e06c2da", + "reference": "66465776cfc249844bde6d117abff1d22e06c2da", + "shasum": "" + }, + "require": { + "php": "^7.0", + "phpdocumentor/reflection-common": "^1.0.0", + "phpdocumentor/type-resolver": "^0.4.0", + "webmozart/assert": "^1.0" + }, + "require-dev": { + "doctrine/instantiator": "~1.0.5", + "mockery/mockery": "^1.0", + "phpunit/phpunit": "^6.4" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "4.x-dev" + } + }, + "autoload": { + "psr-4": { + "phpDocumentor\\Reflection\\": [ + "src/" + ] + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Mike van Riel", + "email": "[email protected]" + } + ], + "description": "With this component, a library can provide support for annotations via DocBlocks or otherwise retrieve information that is embedded in a DocBlock.", + "time": "2017-11-27 17:38:31" + }, + { + "name": "phpdocumentor/type-resolver", + "version": "0.4.0", + "source": { + "type": "git", + "url": "https://github.com/phpDocumentor/TypeResolver.git", + "reference": "9c977708995954784726e25d0cd1dddf4e65b0f7" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/phpDocumentor/TypeResolver/zipball/9c977708995954784726e25d0cd1dddf4e65b0f7", + "reference": "9c977708995954784726e25d0cd1dddf4e65b0f7", + "shasum": "" + }, + "require": { + "php": "^5.5 || ^7.0", + "phpdocumentor/reflection-common": "^1.0" + }, + "require-dev": { + "mockery/mockery": "^0.9.4", + "phpunit/phpunit": "^5.2||^4.8.24" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "phpDocumentor\\Reflection\\": [ + "src/" + ] + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Mike van Riel", + "email": "[email protected]" + } + ], + "time": "2017-07-14 14:27:02" + }, + { + "name": "phpspec/prophecy", + "version": "1.7.3", + "source": { + "type": "git", + "url": "https://github.com/phpspec/prophecy.git", + "reference": "e4ed002c67da8eceb0eb8ddb8b3847bb53c5c2bf" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/phpspec/prophecy/zipball/e4ed002c67da8eceb0eb8ddb8b3847bb53c5c2bf", + "reference": "e4ed002c67da8eceb0eb8ddb8b3847bb53c5c2bf", + "shasum": "" + }, + "require": { + "doctrine/instantiator": "^1.0.2", + "php": "^5.3|^7.0", + "phpdocumentor/reflection-docblock": "^2.0|^3.0.2|^4.0", + "sebastian/comparator": "^1.1|^2.0", + "sebastian/recursion-context": "^1.0|^2.0|^3.0" + }, + "require-dev": { + "phpspec/phpspec": "^2.5|^3.2", + "phpunit/phpunit": "^4.8.35 || ^5.7" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.7.x-dev" + } + }, + "autoload": { + "psr-0": { + "Prophecy\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Konstantin Kudryashov", + "email": "[email protected]", + "homepage": "http://everzet.com" + }, + { + "name": "Marcello Duarte", + "email": "[email protected]" + } + ], + "description": "Highly opinionated mocking framework for PHP 5.3+", + "homepage": "https://github.com/phpspec/prophecy", + "keywords": [ + "Double", + "Dummy", + "fake", + "mock", + "spy", + "stub" + ], + "time": "2017-11-24 13:59:53" + }, + { + "name": "phpunit/php-code-coverage", + "version": "5.3.0", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-code-coverage.git", + "reference": "661f34d0bd3f1a7225ef491a70a020ad23a057a1" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/661f34d0bd3f1a7225ef491a70a020ad23a057a1", + "reference": "661f34d0bd3f1a7225ef491a70a020ad23a057a1", + "shasum": "" + }, + "require": { + "ext-dom": "*", + "ext-xmlwriter": "*", + "php": "^7.0", + "phpunit/php-file-iterator": "^1.4.2", + "phpunit/php-text-template": "^1.2.1", + "phpunit/php-token-stream": "^2.0.1", + "sebastian/code-unit-reverse-lookup": "^1.0.1", + "sebastian/environment": "^3.0", + "sebastian/version": "^2.0.1", + "theseer/tokenizer": "^1.1" + }, + "require-dev": { + "phpunit/phpunit": "^6.0" + }, + "suggest": { + "ext-xdebug": "^2.5.5" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "5.3.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]", + "role": "lead" + } + ], + "description": "Library that provides collection, processing, and rendering functionality for PHP code coverage information.", + "homepage": "https://github.com/sebastianbergmann/php-code-coverage", + "keywords": [ + "coverage", + "testing", + "xunit" + ], + "time": "2017-12-06 09:29:45" + }, + { + "name": "phpunit/php-file-iterator", + "version": "1.4.5", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-file-iterator.git", + "reference": "730b01bc3e867237eaac355e06a36b85dd93a8b4" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-file-iterator/zipball/730b01bc3e867237eaac355e06a36b85dd93a8b4", + "reference": "730b01bc3e867237eaac355e06a36b85dd93a8b4", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.4.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]", + "role": "lead" + } + ], + "description": "FilterIterator implementation that filters files based on a list of suffixes.", + "homepage": "https://github.com/sebastianbergmann/php-file-iterator/", + "keywords": [ + "filesystem", + "iterator" + ], + "time": "2017-11-27 13:52:08" + }, + { + "name": "phpunit/php-text-template", + "version": "1.2.1", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-text-template.git", + "reference": "31f8b717e51d9a2afca6c9f046f5d69fc27c8686" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-text-template/zipball/31f8b717e51d9a2afca6c9f046f5d69fc27c8686", + "reference": "31f8b717e51d9a2afca6c9f046f5d69fc27c8686", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]", + "role": "lead" + } + ], + "description": "Simple template engine.", + "homepage": "https://github.com/sebastianbergmann/php-text-template/", + "keywords": [ + "template" + ], + "time": "2015-06-21 13:50:34" + }, + { + "name": "phpunit/php-timer", + "version": "1.0.9", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-timer.git", + "reference": "3dcf38ca72b158baf0bc245e9184d3fdffa9c46f" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-timer/zipball/3dcf38ca72b158baf0bc245e9184d3fdffa9c46f", + "reference": "3dcf38ca72b158baf0bc245e9184d3fdffa9c46f", + "shasum": "" + }, + "require": { + "php": "^5.3.3 || ^7.0" + }, + "require-dev": { + "phpunit/phpunit": "^4.8.35 || ^5.7 || ^6.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]", + "role": "lead" + } + ], + "description": "Utility class for timing", + "homepage": "https://github.com/sebastianbergmann/php-timer/", + "keywords": [ + "timer" + ], + "time": "2017-02-26 11:10:40" + }, + { + "name": "phpunit/php-token-stream", + "version": "2.0.2", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-token-stream.git", + "reference": "791198a2c6254db10131eecfe8c06670700904db" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-token-stream/zipball/791198a2c6254db10131eecfe8c06670700904db", + "reference": "791198a2c6254db10131eecfe8c06670700904db", + "shasum": "" + }, + "require": { + "ext-tokenizer": "*", + "php": "^7.0" + }, + "require-dev": { + "phpunit/phpunit": "^6.2.4" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.0-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + } + ], + "description": "Wrapper around PHP's tokenizer extension.", + "homepage": "https://github.com/sebastianbergmann/php-token-stream/", + "keywords": [ + "tokenizer" + ], + "time": "2017-11-27 05:48:46" + }, + { + "name": "phpunit/phpunit", + "version": "6.5.5", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/phpunit.git", + "reference": "83d27937a310f2984fd575686138597147bdc7df" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/83d27937a310f2984fd575686138597147bdc7df", + "reference": "83d27937a310f2984fd575686138597147bdc7df", + "shasum": "" + }, + "require": { + "ext-dom": "*", + "ext-json": "*", + "ext-libxml": "*", + "ext-mbstring": "*", + "ext-xml": "*", + "myclabs/deep-copy": "^1.6.1", + "phar-io/manifest": "^1.0.1", + "phar-io/version": "^1.0", + "php": "^7.0", + "phpspec/prophecy": "^1.7", + "phpunit/php-code-coverage": "^5.3", + "phpunit/php-file-iterator": "^1.4.3", + "phpunit/php-text-template": "^1.2.1", + "phpunit/php-timer": "^1.0.9", + "phpunit/phpunit-mock-objects": "^5.0.5", + "sebastian/comparator": "^2.1", + "sebastian/diff": "^2.0", + "sebastian/environment": "^3.1", + "sebastian/exporter": "^3.1", + "sebastian/global-state": "^2.0", + "sebastian/object-enumerator": "^3.0.3", + "sebastian/resource-operations": "^1.0", + "sebastian/version": "^2.0.1" + }, + "conflict": { + "phpdocumentor/reflection-docblock": "3.0.2", + "phpunit/dbunit": "<3.0" + }, + "require-dev": { + "ext-pdo": "*" + }, + "suggest": { + "ext-xdebug": "*", + "phpunit/php-invoker": "^1.1" + }, + "bin": [ + "phpunit" + ], + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "6.5.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]", + "role": "lead" + } + ], + "description": "The PHP Unit Testing framework.", + "homepage": "https://phpunit.de/", + "keywords": [ + "phpunit", + "testing", + "xunit" + ], + "time": "2017-12-17 06:31:19" + }, + { + "name": "phpunit/phpunit-mock-objects", + "version": "5.0.5", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/phpunit-mock-objects.git", + "reference": "283b9f4f670e3a6fd6c4ff95c51a952eb5c75933" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit-mock-objects/zipball/283b9f4f670e3a6fd6c4ff95c51a952eb5c75933", + "reference": "283b9f4f670e3a6fd6c4ff95c51a952eb5c75933", + "shasum": "" + }, + "require": { + "doctrine/instantiator": "^1.0.5", + "php": "^7.0", + "phpunit/php-text-template": "^1.2.1", + "sebastian/exporter": "^3.1" + }, + "conflict": { + "phpunit/phpunit": "<6.0" + }, + "require-dev": { + "phpunit/phpunit": "^6.5" + }, + "suggest": { + "ext-soap": "*" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "5.0.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]", + "role": "lead" + } + ], + "description": "Mock Object library for PHPUnit", + "homepage": "https://github.com/sebastianbergmann/phpunit-mock-objects/", + "keywords": [ + "mock", + "xunit" + ], + "time": "2017-12-10 08:01:53" + }, + { + "name": "sebastian/code-unit-reverse-lookup", + "version": "1.0.1", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/code-unit-reverse-lookup.git", + "reference": "4419fcdb5eabb9caa61a27c7a1db532a6b55dd18" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/code-unit-reverse-lookup/zipball/4419fcdb5eabb9caa61a27c7a1db532a6b55dd18", + "reference": "4419fcdb5eabb9caa61a27c7a1db532a6b55dd18", + "shasum": "" + }, + "require": { + "php": "^5.6 || ^7.0" + }, + "require-dev": { + "phpunit/phpunit": "^5.7 || ^6.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + } + ], + "description": "Looks up which function or method a line of code belongs to", + "homepage": "https://github.com/sebastianbergmann/code-unit-reverse-lookup/", + "time": "2017-03-04 06:30:41" + }, + { + "name": "sebastian/comparator", + "version": "2.1.1", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/comparator.git", + "reference": "b11c729f95109b56a0fe9650c6a63a0fcd8c439f" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/comparator/zipball/b11c729f95109b56a0fe9650c6a63a0fcd8c439f", + "reference": "b11c729f95109b56a0fe9650c6a63a0fcd8c439f", + "shasum": "" + }, + "require": { + "php": "^7.0", + "sebastian/diff": "^2.0", + "sebastian/exporter": "^3.1" + }, + "require-dev": { + "phpunit/phpunit": "^6.4" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.1.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Jeff Welch", + "email": "[email protected]" + }, + { + "name": "Volker Dusch", + "email": "[email protected]" + }, + { + "name": "Bernhard Schussek", + "email": "[email protected]" + }, + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + } + ], + "description": "Provides the functionality to compare PHP values for equality", + "homepage": "https://github.com/sebastianbergmann/comparator", + "keywords": [ + "comparator", + "compare", + "equality" + ], + "time": "2017-12-22 14:50:35" + }, + { + "name": "sebastian/diff", + "version": "2.0.1", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/diff.git", + "reference": "347c1d8b49c5c3ee30c7040ea6fc446790e6bddd" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/diff/zipball/347c1d8b49c5c3ee30c7040ea6fc446790e6bddd", + "reference": "347c1d8b49c5c3ee30c7040ea6fc446790e6bddd", + "shasum": "" + }, + "require": { + "php": "^7.0" + }, + "require-dev": { + "phpunit/phpunit": "^6.2" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.0-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Kore Nordmann", + "email": "[email protected]" + }, + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + } + ], + "description": "Diff implementation", + "homepage": "https://github.com/sebastianbergmann/diff", + "keywords": [ + "diff" + ], + "time": "2017-08-03 08:09:46" + }, + { + "name": "sebastian/environment", + "version": "3.1.0", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/environment.git", + "reference": "cd0871b3975fb7fc44d11314fd1ee20925fce4f5" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/environment/zipball/cd0871b3975fb7fc44d11314fd1ee20925fce4f5", + "reference": "cd0871b3975fb7fc44d11314fd1ee20925fce4f5", + "shasum": "" + }, + "require": { + "php": "^7.0" + }, + "require-dev": { + "phpunit/phpunit": "^6.1" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.1.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + } + ], + "description": "Provides functionality to handle HHVM/PHP environments", + "homepage": "http://www.github.com/sebastianbergmann/environment", + "keywords": [ + "Xdebug", + "environment", + "hhvm" + ], + "time": "2017-07-01 08:51:00" + }, + { + "name": "sebastian/exporter", + "version": "3.1.0", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/exporter.git", + "reference": "234199f4528de6d12aaa58b612e98f7d36adb937" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/exporter/zipball/234199f4528de6d12aaa58b612e98f7d36adb937", + "reference": "234199f4528de6d12aaa58b612e98f7d36adb937", + "shasum": "" + }, + "require": { + "php": "^7.0", + "sebastian/recursion-context": "^3.0" + }, + "require-dev": { + "ext-mbstring": "*", + "phpunit/phpunit": "^6.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.1.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Jeff Welch", + "email": "[email protected]" + }, + { + "name": "Volker Dusch", + "email": "[email protected]" + }, + { + "name": "Bernhard Schussek", + "email": "[email protected]" + }, + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + }, + { + "name": "Adam Harvey", + "email": "[email protected]" + } + ], + "description": "Provides the functionality to export PHP variables for visualization", + "homepage": "http://www.github.com/sebastianbergmann/exporter", + "keywords": [ + "export", + "exporter" + ], + "time": "2017-04-03 13:19:02" + }, + { + "name": "sebastian/global-state", + "version": "2.0.0", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/global-state.git", + "reference": "e8ba02eed7bbbb9e59e43dedd3dddeff4a56b0c4" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/global-state/zipball/e8ba02eed7bbbb9e59e43dedd3dddeff4a56b0c4", + "reference": "e8ba02eed7bbbb9e59e43dedd3dddeff4a56b0c4", + "shasum": "" + }, + "require": { + "php": "^7.0" + }, + "require-dev": { + "phpunit/phpunit": "^6.0" + }, + "suggest": { + "ext-uopz": "*" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.0-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + } + ], + "description": "Snapshotting of global state", + "homepage": "http://www.github.com/sebastianbergmann/global-state", + "keywords": [ + "global state" + ], + "time": "2017-04-27 15:39:26" + }, + { + "name": "sebastian/object-enumerator", + "version": "3.0.3", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/object-enumerator.git", + "reference": "7cfd9e65d11ffb5af41198476395774d4c8a84c5" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/object-enumerator/zipball/7cfd9e65d11ffb5af41198476395774d4c8a84c5", + "reference": "7cfd9e65d11ffb5af41198476395774d4c8a84c5", + "shasum": "" + }, + "require": { + "php": "^7.0", + "sebastian/object-reflector": "^1.1.1", + "sebastian/recursion-context": "^3.0" + }, + "require-dev": { + "phpunit/phpunit": "^6.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.0.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + } + ], + "description": "Traverses array structures and object graphs to enumerate all referenced objects", + "homepage": "https://github.com/sebastianbergmann/object-enumerator/", + "time": "2017-08-03 12:35:26" + }, + { + "name": "sebastian/object-reflector", + "version": "1.1.1", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/object-reflector.git", + "reference": "773f97c67f28de00d397be301821b06708fca0be" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/object-reflector/zipball/773f97c67f28de00d397be301821b06708fca0be", + "reference": "773f97c67f28de00d397be301821b06708fca0be", + "shasum": "" + }, + "require": { + "php": "^7.0" + }, + "require-dev": { + "phpunit/phpunit": "^6.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.1-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + } + ], + "description": "Allows reflection of object attributes, including inherited and non-public ones", + "homepage": "https://github.com/sebastianbergmann/object-reflector/", + "time": "2017-03-29 09:07:27" + }, + { + "name": "sebastian/recursion-context", + "version": "3.0.0", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/recursion-context.git", + "reference": "5b0cd723502bac3b006cbf3dbf7a1e3fcefe4fa8" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/recursion-context/zipball/5b0cd723502bac3b006cbf3dbf7a1e3fcefe4fa8", + "reference": "5b0cd723502bac3b006cbf3dbf7a1e3fcefe4fa8", + "shasum": "" + }, + "require": { + "php": "^7.0" + }, + "require-dev": { + "phpunit/phpunit": "^6.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.0.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Jeff Welch", + "email": "[email protected]" + }, + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + }, + { + "name": "Adam Harvey", + "email": "[email protected]" + } + ], + "description": "Provides functionality to recursively process PHP variables", + "homepage": "http://www.github.com/sebastianbergmann/recursion-context", + "time": "2017-03-03 06:23:57" + }, + { + "name": "sebastian/resource-operations", + "version": "1.0.0", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/resource-operations.git", + "reference": "ce990bb21759f94aeafd30209e8cfcdfa8bc3f52" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/resource-operations/zipball/ce990bb21759f94aeafd30209e8cfcdfa8bc3f52", + "reference": "ce990bb21759f94aeafd30209e8cfcdfa8bc3f52", + "shasum": "" + }, + "require": { + "php": ">=5.6.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]" + } + ], + "description": "Provides a list of PHP built-in functions that operate on resources", + "homepage": "https://www.github.com/sebastianbergmann/resource-operations", + "time": "2015-07-28 20:34:47" + }, + { + "name": "sebastian/version", + "version": "2.0.1", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/version.git", + "reference": "99732be0ddb3361e16ad77b68ba41efc8e979019" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/version/zipball/99732be0ddb3361e16ad77b68ba41efc8e979019", + "reference": "99732be0ddb3361e16ad77b68ba41efc8e979019", + "shasum": "" + }, + "require": { + "php": ">=5.6" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.0.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "[email protected]", + "role": "lead" + } + ], + "description": "Library that helps with managing the version number of Git-hosted PHP projects", + "homepage": "https://github.com/sebastianbergmann/version", + "time": "2016-10-03 07:35:21" + }, + { + "name": "theseer/tokenizer", + "version": "1.1.0", + "source": { + "type": "git", + "url": "https://github.com/theseer/tokenizer.git", + "reference": "cb2f008f3f05af2893a87208fe6a6c4985483f8b" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/theseer/tokenizer/zipball/cb2f008f3f05af2893a87208fe6a6c4985483f8b", + "reference": "cb2f008f3f05af2893a87208fe6a6c4985483f8b", + "shasum": "" + }, + "require": { + "ext-dom": "*", + "ext-tokenizer": "*", + "ext-xmlwriter": "*", + "php": "^7.0" + }, + "type": "library", + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Arne Blankerts", + "email": "[email protected]", + "role": "Developer" + } + ], + "description": "A small library for converting tokenized PHP source code into XML and potentially other formats", + "time": "2017-04-07 12:08:54" + }, + { + "name": "webmozart/assert", + "version": "1.2.0", + "source": { + "type": "git", + "url": "https://github.com/webmozart/assert.git", + "reference": "2db61e59ff05fe5126d152bd0655c9ea113e550f" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/webmozart/assert/zipball/2db61e59ff05fe5126d152bd0655c9ea113e550f", + "reference": "2db61e59ff05fe5126d152bd0655c9ea113e550f", + "shasum": "" + }, + "require": { + "php": "^5.3.3 || ^7.0" + }, + "require-dev": { + "phpunit/phpunit": "^4.6", + "sebastian/version": "^1.0.1" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.3-dev" + } + }, + "autoload": { + "psr-4": { + "Webmozart\\Assert\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Bernhard Schussek", + "email": "[email protected]" + } + ], + "description": "Assertions to validate method input/output with nice error messages.", + "keywords": [ + "assert", + "check", + "validate" + ], + "time": "2016-11-23 20:04:58" + } + ], + "aliases": [], + "minimum-stability": "stable", + "stability-flags": [], + "prefer-stable": false, + "prefer-lowest": false, + "platform": { + "php": "^7.0" + }, + "platform-dev": [] +} diff --git a/config.go b/config.go new file mode 100644 index 00000000..e5d78d49 --- /dev/null +++ b/config.go @@ -0,0 +1,19 @@ +package roadrunner + +import "time" + +// Config defines basic behaviour of worker creation and handling process. +type Config struct { + // MaxWorkers defines how many sub-processes can be run at once. This value might be doubled by Balancer while hot-swap. + MaxWorkers uint64 + + // MaxExecutions defines how many executions is allowed for the worker until it's destruction. Set 1 to create new process + // for each new task, 0 to let worker handle as many tasks as it can. + MaxExecutions uint64 + + // AllocateTimeout defines for how long pool will be waiting for a worker to be freed to handle the task. + AllocateTimeout time.Duration + + // DestroyOnError when set to true workers will be destructed after any JobError. + DestroyOnError bool +} diff --git a/error.go b/error.go new file mode 100644 index 00000000..a8f9e539 --- /dev/null +++ b/error.go @@ -0,0 +1,17 @@ +package roadrunner + +// WorkerError is communication/process error. +type WorkerError string + +// Error converts error context to string +func (we WorkerError) Error() string { + return string(we) +} + +// JobError is job level error (no worker halt) +type JobError []byte + +// Error converts error context to string +func (je JobError) Error() string { + return string(je) +} diff --git a/error_test.go b/error_test.go new file mode 100644 index 00000000..e8fa62b4 --- /dev/null +++ b/error_test.go @@ -0,0 +1,16 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestWorkerError_Error(t *testing.T) { + e := WorkerError("error") + assert.Equal(t, "error", e.Error()) +} + +func TestJobError_Error(t *testing.T) { + e := JobError("error") + assert.Equal(t, "error", e.Error()) +} diff --git a/factory.go b/factory.go new file mode 100644 index 00000000..e58d9364 --- /dev/null +++ b/factory.go @@ -0,0 +1,12 @@ +package roadrunner + +import "os/exec" + +// Factory is responsible of wrapping given command into tasks worker. +type Factory interface { + // NewWorker creates new worker process based on given process. + NewWorker(cmd *exec.Cmd) (w *Worker, err error) + + // Close closes all open factory descriptors. + Close() error +} diff --git a/pipe_factory.go b/pipe_factory.go new file mode 100644 index 00000000..ce32dacc --- /dev/null +++ b/pipe_factory.go @@ -0,0 +1,46 @@ +package roadrunner + +import ( + "github.com/spiral/goridge" + "os/exec" +) + +// PipeFactory connects to workers using standard streams (STDIN, STDOUT pipes). +type PipeFactory struct { +} + +// NewPipeFactory returns new factory instance and starts listening +func NewPipeFactory() *PipeFactory { + return &PipeFactory{} +} + +// NewWorker creates worker and connects it to appropriate relay or returns error +func (f *PipeFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) { + w, err = NewWorker(cmd) + if err != nil { + return nil, err + } + + in, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + + out, err := cmd.StdinPipe() + if err != nil { + return nil, err + } + + if err := w.Start(); err != nil { + return nil, err + } + + w.attach(goridge.NewPipeRelay(in, out)) + + return w, nil +} + +// Close closes all open factory descriptors. +func (f *PipeFactory) Close() error { + return nil +} diff --git a/pool.go b/pool.go new file mode 100644 index 00000000..50f14e4e --- /dev/null +++ b/pool.go @@ -0,0 +1,189 @@ +package roadrunner + +import ( + "fmt" + "os/exec" + "sync" + "sync/atomic" + "time" +) + +const ( + // ContextTerminate must be sent by worker in control payload if worker want to die. + ContextTerminate = "TERMINATE" +) + +// Pool controls worker creation, destruction and task routing. +type Pool struct { + cfg Config // pool behaviour + cmd func() *exec.Cmd // worker command creator + factory Factory // creates and connects to workers + numWorkers uint64 // current number of tasks workers + tasks sync.WaitGroup // counts all tasks executions + mua sync.Mutex // protects worker allocation + muw sync.RWMutex // protects state of worker list + workers []*Worker // all registered workers + free chan *Worker // freed workers +} + +// NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker. +func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) { + p := &Pool{ + cfg: cfg, + cmd: cmd, + factory: factory, + workers: make([]*Worker, 0, cfg.MaxWorkers), + free: make(chan *Worker, cfg.MaxWorkers), + } + + // to test if worker ready + w, err := p.createWorker() + if err != nil { + return nil, err + } + + p.free <- w + return p, nil +} + +// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is +// being destroyed. +func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { + p.tasks.Add(1) + defer p.tasks.Done() + + w, err := p.allocateWorker() + if err != nil { + return nil, nil, err + } + + if resp, rCtx, err = w.Execute(payload, ctx); err != nil { + if !p.cfg.DestroyOnError { + if err, jobError := err.(JobError); jobError { + p.free <- w + return nil, nil, err + } + } + + // worker level error + p.destroyWorker(w) + + return nil, nil, err + } + + // controlled destruction + if len(resp) == 0 && string(rCtx) == ContextTerminate { + p.destroyWorker(w) + go func() { + //immediate refill + if w, err := p.createWorker(); err != nil { + p.free <- w + } + }() + + return p.Execute(payload, ctx) + } + + if p.cfg.MaxExecutions != 0 && atomic.LoadUint64(&w.NumExecutions) > p.cfg.MaxExecutions { + p.destroyWorker(w) + } else { + p.free <- w + } + + return resp, rCtx, nil +} + +// Config returns associated pool configuration. +func (p *Pool) Config() Config { + return p.cfg +} + +// Workers returns workers associated with the pool. +func (p *Pool) Workers() (workers []*Worker) { + p.muw.RLock() + defer p.muw.RUnlock() + + for _, w := range p.workers { + workers = append(workers, w) + } + + return workers +} + +// Close all underlying workers (but let them to complete the task). +func (p *Pool) Close() { + p.tasks.Wait() + + var wg sync.WaitGroup + for _, w := range p.Workers() { + wg.Add(1) + go func(w *Worker) { + defer wg.Done() + p.destroyWorker(w) + }(w) + } + + wg.Wait() +} + +// finds free worker in a given time interval or creates new if allowed. +func (p *Pool) allocateWorker() (*Worker, error) { + p.mua.Lock() + defer p.mua.Unlock() + + select { + case w := <-p.free: + // we already have free worker + return w, nil + default: + if p.numWorkers < p.cfg.MaxWorkers { + return p.createWorker() + } + + timeout := time.NewTimer(p.cfg.AllocateTimeout) + select { + case <-timeout.C: + return nil, fmt.Errorf("unable to allocate worker, timeout (%s)", p.cfg.AllocateTimeout) + case w := <-p.free: + timeout.Stop() + return w, nil + } + } +} + +// destroy and remove worker from the pool. +func (p *Pool) destroyWorker(w *Worker) { + atomic.AddUint64(&p.numWorkers, ^uint64(0)) + + go func() { + w.Stop() + + p.muw.Lock() + defer p.muw.Unlock() + + for i, wc := range p.workers { + if wc == w { + p.workers = p.workers[:i+1] + break + } + } + }() +} + +// creates new worker (must be called in a locked state). +func (p *Pool) createWorker() (*Worker, error) { + w, err := p.factory.NewWorker(p.cmd()) + if err != nil { + return nil, err + } + + atomic.AddUint64(&p.numWorkers, 1) + + go func() { + p.muw.Lock() + defer p.muw.Unlock() + p.workers = append(p.workers, w) + }() + + return w, nil +} diff --git a/socket_factory.go b/socket_factory.go new file mode 100644 index 00000000..5d1b488b --- /dev/null +++ b/socket_factory.go @@ -0,0 +1,138 @@ +package roadrunner + +import ( + "encoding/json" + "fmt" + "github.com/spiral/goridge" + "net" + "os" + "os/exec" + "sync" + "time" +) + +// SocketFactory connects to external workers using socket server. +type SocketFactory struct { + ls net.Listener // listens for incoming connections from underlying processes + tout time.Duration // connection timeout + mu sync.Mutex // protects socket mapping + wait map[int]chan *goridge.SocketRelay // sockets which are waiting for process association +} + +// NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory +// should wait for incoming relay connection +func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory { + f := &SocketFactory{ + ls: ls, + tout: tout, + wait: make(map[int]chan *goridge.SocketRelay), + } + + go f.listen() + return f +} + +// NewWorker creates worker and connects it to appropriate relay or returns error +func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) { + w, err = NewWorker(cmd) + if err != nil { + return nil, err + } + + if err := w.Start(); err != nil { + return nil, err + } + + w.Pid = &w.cmd.Process.Pid + if w.Pid == nil { + return nil, fmt.Errorf("can't to start worker %s", w) + } + + rl, err := f.waitRelay(*w.Pid, f.tout) + if err != nil { + return nil, fmt.Errorf("can't connect to worker %s: %s", w, err) + } + + w.attach(rl) + + return w, nil +} + +// Close closes all open factory descriptors. +func (f *SocketFactory) Close() error { + return f.ls.Close() +} + +// listen for incoming wait and associate sockets with active workers +func (f *SocketFactory) listen() { + for { + conn, err := f.ls.Accept() + if err != nil { + return + } + + rl := goridge.NewSocketRelay(conn) + if pid, err := fetchPID(rl); err == nil { + f.relayChan(pid) <- rl + } + } +} + +// waits for worker to connect over socket and returns associated relay of timeout +func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketRelay, error) { + timer := time.NewTimer(tout) + select { + case rl := <-f.relayChan(pid): + timer.Stop() + f.cleanChan(pid) + + return rl, nil + case <-timer.C: + return nil, fmt.Errorf("relay timer for [%v]", pid) + } +} + +// chan to store relay associated with specific Pid +func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay { + f.mu.Lock() + defer f.mu.Unlock() + + rl, ok := f.wait[pid] + if !ok { + f.wait[pid] = make(chan *goridge.SocketRelay) + return f.wait[pid] + } + + return rl +} + +// deletes relay chan associated with specific Pid +func (f *SocketFactory) cleanChan(pid int) { + f.mu.Lock() + defer f.mu.Unlock() + + delete(f.wait, pid) +} + +// send control command to relay and return associated Pid (or error) +func fetchPID(rl goridge.Relay) (pid int, err error) { + if err := sendCommand(rl, PidCommand{Pid: os.Getpid()}); err != nil { + return 0, err + } + + body, p, err := rl.Receive() + if !p.HasFlag(goridge.PayloadControl) { + return 0, fmt.Errorf("unexpected response, `control` header is missing") + } + + link := &PidCommand{} + if err := json.Unmarshal(body, link); err != nil { + return 0, err + } + + if link.Parent != os.Getpid() { + return 0, fmt.Errorf("integrity error, parent process does not match") + } + + return link.Pid, nil +} diff --git a/source/Exceptions/RoadRunnerException.php b/source/Exceptions/RoadRunnerException.php new file mode 100644 index 00000000..fa7b8da3 --- /dev/null +++ b/source/Exceptions/RoadRunnerException.php @@ -0,0 +1,13 @@ +<?php +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ + +namespace Spiral\RoadRunner\Exceptions; + +class RoadRunnerException extends \RuntimeException +{ + +}
\ No newline at end of file diff --git a/source/Worker.php b/source/Worker.php new file mode 100644 index 00000000..d31b48bd --- /dev/null +++ b/source/Worker.php @@ -0,0 +1,162 @@ +<?php +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exceptions\GoridgeException; +use Spiral\Goridge\RelayInterface as Relay; +use Spiral\RoadRunner\Exceptions\RoadRunnerException; + +/** + * Accepts connection from RoadRunner server over given Goridge relay. + * + * Example: + * + * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT)); + * while ($task = $worker->receive($context)) { + * $worker->send("DONE", json_encode($context)); + * } + */ +class Worker +{ + // Must be set as context value in order to perform controlled demolition of worker + const TERMINATE = "TERMINATE"; + + // Must be set as context value in order to represent content as an error + const ERROR = "ERROR"; + + /** @var Relay */ + private $relay; + + /** + * @param Relay $relay + */ + public function __construct(Relay $relay) + { + $this->relay = $relay; + } + + /** + * Receive packet of information to process, returns null when process must be stopped. Might + * return Error to wrap error message from server. + * + * @param array $context Contains parsed context array send by the server. + * + * @return \Error|null|string + * @throws GoridgeException + */ + public function receive(&$context) + { + $body = $this->relay->receiveSync($flags); + + if ($flags & Relay::PAYLOAD_CONTROL) { + if ($this->handleControl($body, $context)) { + // wait for the next command + return $this->receive($context); + } + + // Expect process termination + return null; + } + + if ($flags & Relay::PAYLOAD_ERROR) { + return new \Error($body); + } + + return $body; + } + + /** + * Respond to the server with result of task execution and execution context. + * + * Example: + * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders())); + * + * @param string $payload + * @param string $context + */ + public function send(string $payload, string $context = null) + { + if (is_null($context)) { + $this->relay->send($context, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE); + } else { + $this->relay->send($context, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW); + } + + $this->relay->send($payload, Relay::PAYLOAD_RAW); + } + + /** + * Respond to the server with an error. Error must be treated as TaskError and might not cause + * worker destruction. + * + * Example: + * + * $worker->error("invalid payload"); + * + * @param string $message + */ + public function error(string $message) + { + $this->relay->send( + $message, + Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR + ); + } + + /** + * Terminate the process. Server must automatically pass task to the next available process. + * Worker will receive TerminateCommand context after calling this method. + * + * @throws GoridgeException + */ + public function terminate() + { + $this->send(null, self::TERMINATE); + } + + /** + * Handles incoming control command payload and executes it if required. + * + * @param string $body + * @param array $context Exported context (if any). + * + * @returns bool True when continue processing. + * + * @throws RoadRunnerException + */ + private function handleControl(string $body = null, &$context = null): bool + { + if (is_null($body)) { + // empty prefix + return true; + } + + $parsed = json_decode($body, true); + if ($parsed === false) { + throw new RoadRunnerException("invalid task context, JSON payload is expected"); + } + + // PID negotiation (socket connections only) + if (!empty($parsed['pid'])) { + $this->relay->send(json_encode([ + 'pid' => getmypid(), + 'parent' => $parsed['pid'], + ]), Relay::PAYLOAD_CONTROL); + } + + // termination request + if (!empty($parsed['terminate'])) { + return false; + } + + // not a command but execution context + $context = $parsed; + + return true; + } +}
\ No newline at end of file diff --git a/state.go b/state.go new file mode 100644 index 00000000..c02ae7e7 --- /dev/null +++ b/state.go @@ -0,0 +1,39 @@ +package roadrunner + +// State is current state int. +type State int + +const ( + // StateInactive - no associated process + StateInactive State = iota + // StateBooting - relay attached but w.Start() not executed + StateBooting + // StateReady - ready for job. + StateReady + // StateWorking - working on given payload. + StateWorking + // StateStopped - process has been terminated + StateStopped + // StateError - error State (can't be used) + StateError +) + +// String returns current state as string. +func (s State) String() string { + switch s { + case StateInactive: + return "inactive" + case StateBooting: + return "booting" + case StateReady: + return "ready" + case StateWorking: + return "working" + case StateStopped: + return "stopped" + case StateError: + return "error" + } + + return "undefined" +} diff --git a/tests/broken-client.php b/tests/broken-client.php new file mode 100644 index 00000000..ed5bde20 --- /dev/null +++ b/tests/broken-client.php @@ -0,0 +1,17 @@ +<?php + +use Spiral\Goridge; +use Spiral\RoadRunner; + +/** + * echo client over pipes. + */ +ini_set('display_errors', 'stderr'); +require "vendor/autoload.php"; + +$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); + +while ($in = $rr->receive($ctx)) { + echo undefined_function(); + $rr->send((string)$in); +}
\ No newline at end of file diff --git a/tests/echo-client.php b/tests/echo-client.php new file mode 100644 index 00000000..22761862 --- /dev/null +++ b/tests/echo-client.php @@ -0,0 +1,20 @@ +<?php + +use Spiral\Goridge; +use Spiral\RoadRunner; + +/** + * echo client over pipes. + */ +ini_set('display_errors', 'stderr'); +require "vendor/autoload.php"; + +$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); + +while ($in = $rr->receive($ctx)) { + try { + $rr->send((string)$in); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +}
\ No newline at end of file diff --git a/tests/error-client.php b/tests/error-client.php new file mode 100644 index 00000000..113d1197 --- /dev/null +++ b/tests/error-client.php @@ -0,0 +1,16 @@ +<?php + +use Spiral\Goridge; +use Spiral\RoadRunner; + +/** + * echo client over pipes. + */ +ini_set('display_errors', 'stderr'); +require "vendor/autoload.php"; + +$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); + +while ($in = $rr->receive($ctx)) { + $rr->error((string)$in); +}
\ No newline at end of file diff --git a/worker.go b/worker.go new file mode 100644 index 00000000..8960b3fa --- /dev/null +++ b/worker.go @@ -0,0 +1,168 @@ +package roadrunner + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/spiral/goridge" + "io" + "os/exec" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +// Worker - supervised process with api over goridge.Relay. +type Worker struct { + // State current worker state. + State State + + // Last time worker State has changed + Last time.Time + + // NumExecutions how many times worker have been invoked. + NumExecutions uint64 + + // Pid contains process ID and empty until worker is started. + Pid *int + + cmd *exec.Cmd // underlying command process + err *bytes.Buffer // aggregates stderr + rl goridge.Relay // communication bus with underlying process + mu sync.RWMutex // ensures than only one execution can be run at once +} + +// NewWorker creates new worker +func NewWorker(cmd *exec.Cmd) (*Worker, error) { + w := &Worker{ + cmd: cmd, + err: bytes.NewBuffer(nil), + State: StateInactive, + } + + if w.cmd.Process != nil { + return nil, fmt.Errorf("can't attach to running process") + } + + return w, nil +} + +// String returns worker description. +func (w *Worker) String() string { + state := w.State.String() + + if w.Pid != nil { + state = state + ", pid:" + strconv.Itoa(*w.Pid) + } + + return fmt.Sprintf("(`%s` [%s], execs: %v)", strings.Join(w.cmd.Args, " "), state, w.NumExecutions) +} + +// Start underlying process or return error +func (w *Worker) Start() error { + stderr, err := w.cmd.StderrPipe() + if err != nil { + w.setState(StateError) + return err + } + + // copying all process errors into buffer space + go io.Copy(w.err, stderr) + + if err := w.cmd.Start(); err != nil { + w.setState(StateError) + return w.mockError(err) + } + + w.setState(StateReady) + + return nil +} + +// Execute command and return result and result context. +func (w *Worker) Execute(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + if w.State != StateReady { + return nil, nil, fmt.Errorf("worker must be in state `waiting` (`%s` given)", w.State) + } + + w.setState(StateReady) + atomic.AddUint64(&w.NumExecutions, 1) + + if ctx != nil { + if data, err := json.Marshal(ctx); err == nil { + w.rl.Send(data, goridge.PayloadControl) + } else { + return nil, nil, fmt.Errorf("invalid context: %s", err) + } + } else { + w.rl.Send(nil, goridge.PayloadControl|goridge.PayloadEmpty) + } + + w.rl.Send(body, 0) + + rCtx, p, err := w.rl.Receive() + + if !p.HasFlag(goridge.PayloadControl) { + return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)")) + } + + if p.HasFlag(goridge.PayloadError) { + w.setState(StateReady) + return nil, nil, JobError(rCtx) + } + + if resp, p, err = w.rl.Receive(); err != nil { + w.setState(StateError) + return nil, nil, w.mockError(fmt.Errorf("worker error: %s", err)) + } + + w.setState(StateReady) + return resp, rCtx, nil +} + +// Stop underlying process or return error. +func (w *Worker) Stop() { + w.mu.Lock() + defer w.mu.Unlock() + + w.setState(StateInactive) + + go func() { + sendCommand(w.rl, &TerminateCommand{Terminate: true}) + }() + + w.cmd.Wait() + w.rl.Close() + + w.setState(StateStopped) +} + +// attach payload/control relay to the worker. +func (w *Worker) attach(rl goridge.Relay) { + w.mu.Lock() + defer w.mu.Unlock() + + w.rl = rl + w.setState(StateBooting) +} + +// sets worker State and it's context (non blocking!). +func (w *Worker) setState(state State) { + // safer? + w.State = state + w.Last = time.Now() +} + +// mockError attaches worker specific error (from stderr) to parent error +func (w *Worker) mockError(err error) WorkerError { + if w.err.Len() != 0 { + return WorkerError(w.err.String()) + } + + return WorkerError(err.Error()) +} diff --git a/worker_test.go b/worker_test.go new file mode 100644 index 00000000..d4d24364 --- /dev/null +++ b/worker_test.go @@ -0,0 +1,130 @@ +package roadrunner + +import ( + "github.com/spiral/goridge" + "github.com/stretchr/testify/assert" + "io" + "os/exec" + "testing" + "time" +) + +func getPipes(cmd *exec.Cmd) (io.ReadCloser, io.WriteCloser) { + in, err := cmd.StdoutPipe() + if err != nil { + panic(err) + } + + out, err := cmd.StdinPipe() + if err != nil { + panic(err) + } + + return in, out +} + +func TestOnStarted(t *testing.T) { + pr := exec.Command("php", "tests/echo-client.php") + pr.Start() + + _, err := NewWorker(pr) + assert.NotNil(t, err) + assert.Equal(t, "can't attach to running process", err.Error()) +} + +func TestNewWorkerState(t *testing.T) { + w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) + assert.Nil(t, err) + assert.Equal(t, StateInactive, w.State) + + w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) + assert.Equal(t, StateBooting, w.State) + + assert.Nil(t, w.Start()) + assert.Equal(t, StateReady, w.State) +} + +func TestStop(t *testing.T) { + w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) + assert.Nil(t, err) + + w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) + assert.Nil(t, w.Start()) + + w.Stop() + assert.Equal(t, StateStopped, w.State) +} + +func TestEcho(t *testing.T) { + w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) + assert.Nil(t, err) + + w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) + assert.Nil(t, w.Start()) + + r, ctx, err := w.Execute([]byte("hello"), nil) + assert.Nil(t, err) + assert.Nil(t, ctx) + assert.Equal(t, "hello", string(r)) +} + +func TestError(t *testing.T) { + w, err := NewWorker(exec.Command("php", "tests/error-client.php")) + assert.Nil(t, err) + + w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) + assert.Nil(t, w.Start()) + + r, ctx, err := w.Execute([]byte("hello"), nil) + assert.Nil(t, r) + assert.NotNil(t, err) + assert.Nil(t, ctx) + + assert.IsType(t, JobError{}, err) + assert.Equal(t, "hello", err.Error()) +} + +func TestBroken(t *testing.T) { + w, err := NewWorker(exec.Command("php", "tests/broken-client.php")) + assert.Nil(t, err) + + w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) + assert.Nil(t, w.Start()) + + r, ctx, err := w.Execute([]byte("hello"), nil) + assert.Nil(t, r) + assert.NotNil(t, err) + assert.Nil(t, ctx) + + assert.IsType(t, WorkerError(""), err) + assert.Contains(t, err.Error(), "undefined_function()") +} + +func TestNumExecutions(t *testing.T) { + w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) + assert.Nil(t, err) + + w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) + assert.Nil(t, w.Start()) + + w.Execute([]byte("hello"), nil) + assert.Equal(t, uint64(1), w.NumExecutions) + + w.Execute([]byte("hello"), nil) + assert.Equal(t, uint64(2), w.NumExecutions) + + w.Execute([]byte("hello"), nil) + assert.Equal(t, uint64(3), w.NumExecutions) +} + +func TestLastExecution(t *testing.T) { + w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) + assert.Nil(t, err) + + w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) + assert.Nil(t, w.Start()) + + tm := time.Now() + w.Execute([]byte("hello"), nil) + assert.True(t, w.Last.After(tm)) +} |