diff options
author | Wolfy-J <[email protected]> | 2018-01-23 19:51:15 -0500 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-01-23 19:51:15 -0500 |
commit | 78a42de837928cf7d10a1ae04d7e82e56d66e1e2 (patch) | |
tree | 8882b9a051bcc9c42328df583c0bb8c39a89591e | |
parent | fa4bd78d9f7c5f74e8445374370927c742fc4e78 (diff) |
API update
-rw-r--r-- | .gitignore | 5 | ||||
-rw-r--r-- | README.md | 23 | ||||
-rw-r--r-- | balancer.go | 130 | ||||
-rw-r--r-- | balancer_test.go | 1 | ||||
-rw-r--r-- | commands.go | 27 | ||||
-rw-r--r-- | composer.lock | 83 | ||||
-rw-r--r-- | config.go | 40 | ||||
-rw-r--r-- | config_test.go | 40 | ||||
-rw-r--r-- | error.go | 17 | ||||
-rw-r--r-- | error_test.go | 16 | ||||
-rw-r--r-- | factory.go | 8 | ||||
-rw-r--r-- | job_error.go | 10 | ||||
-rw-r--r-- | job_error_test.go | 11 | ||||
-rw-r--r-- | payload.go | 14 | ||||
-rw-r--r-- | payload_test.go | 1 | ||||
-rw-r--r-- | pipe_factory.go | 48 | ||||
-rw-r--r-- | pipe_factory_test.go | 107 | ||||
-rw-r--r-- | pool.go | 231 | ||||
-rw-r--r-- | pool_test.go | 208 | ||||
-rw-r--r-- | protocol.go | 52 | ||||
-rw-r--r-- | socket_factory.go | 121 | ||||
-rw-r--r-- | socket_factory_test.go | 274 | ||||
-rw-r--r-- | source/Worker.php | 16 | ||||
-rw-r--r-- | state.go | 88 | ||||
-rw-r--r-- | state_test.go | 21 | ||||
-rw-r--r-- | tests/broken-client.php | 17 | ||||
-rw-r--r-- | tests/broken.php | 14 | ||||
-rw-r--r-- | tests/client.php | 36 | ||||
-rw-r--r-- | tests/delay.php | 18 | ||||
-rw-r--r-- | tests/echo.php (renamed from tests/echo-client.php) | 11 | ||||
-rw-r--r-- | tests/error-client.php | 16 | ||||
-rw-r--r-- | tests/error.php | 13 | ||||
-rw-r--r-- | tests/failboot.php | 3 | ||||
-rw-r--r-- | tests/pid.php | 17 | ||||
-rw-r--r-- | tests/slow-client.php | 38 | ||||
-rw-r--r-- | worker.go | 258 | ||||
-rw-r--r-- | worker_test.go | 175 |
37 files changed, 1595 insertions, 613 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..6755eaa6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea +.idea/* +composer.lock +vendor/ +bin/
\ No newline at end of file @@ -1,24 +1,21 @@ RoadRunner ========== -[![Latest Stable Version](https://poser.pugx.org/spiral/roadrunner/v/stable)](https://packagist.org/packages/spiral/roadrunner) -[![GoDoc](https://godoc.org/github.com/spiral/roadrunner?status.svg)](https://godoc.org/github.com/spiral/roadrunner) -[![Build Status](https://travis-ci.org/spiral/roadrunner.svg?branch=master)](https://travis-ci.org/spiral/roadrunner) -[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/spiral/roadrunner/badges/quality-score.png)](https://scrutinizer-ci.com/g/spiral/roadrunner/?branch=master) -[![Go Report Card](https://goreportcard.com/badge/github.com/spiral/roadrunner)](https://goreportcard.com/report/github.com/spiral/roadrunner) - -PHP application server library for Golang. +Embeddable PHP application server library for Golang. Features: -------- -- load balancer, process manager and task pipeline -- hot-swap of workers +- load balancer, process manager and task pipeline +- swaps workers without stopping the server - build for multiple frontends (queue, rest, psr-7, async php, etc) -- works over TPC, unix sockets, standard pipes -- safe worker termination -- protocol, worker and job level error management +- works over TPC, unix sockets and standard pipes +- controlled worker termination +- automatic worker replacement +- worker lifecycle management (create/stop/allocate timeouts) +- payload context +- protocol, job and worker level error management - very fast (~200k calls per second on Ryzen 1700X over 17 threads) - works on Windows License: -------- -The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. +The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information.
\ No newline at end of file diff --git a/balancer.go b/balancer.go index 29fcd8d5..16418d55 100644 --- a/balancer.go +++ b/balancer.go @@ -1,63 +1,71 @@ package roadrunner -import ( - "os/exec" - "sync" -) - -// Balancer provides ability to perform hot-swap between 2 worker pools. -type Balancer struct { - mu sync.Mutex // protects pool hot swapping - pool *Pool // pool to work for user commands -} - -// Spawn initiates underlying pool of workers and replaced old one. -func (b *Balancer) Spawn(cmd func() *exec.Cmd, factory Factory, cfg Config) error { - b.mu.Lock() - defer b.mu.Unlock() - - var ( - err error - old *Pool - ) - - old = b.pool - if b.pool, err = NewPool(cmd, factory, cfg); err != nil { - return err - } - - if old != nil { - go func() { - old.Close() - }() - } - - return nil -} - -// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is -// being destroyed. -func (b *Balancer) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { - b.mu.Lock() - pool := b.pool - b.mu.Unlock() - - return pool.Execute(payload, ctx) -} - -// Workers return list of active workers. -func (b *Balancer) Workers() []*Worker { - b.mu.Lock() - defer b.mu.Unlock() - - return b.pool.Workers() -} - -// Close closes underlying pool. -func (b *Balancer) Close() { - b.mu.Lock() - defer b.mu.Unlock() - - b.pool.Close() - b.pool = nil -} +// +//import ( +// "os/exec" +// "sync" +//) +// +//// Swapper provides ability to perform hot-swap between 2 worker pools. +//type Swapper struct { +// mu sync.Mutex // protects pool hot swapping +// pool *Pool // pool to work for user commands +//} +// +//// Swap initiates underlying pool of workers and replaces old one. +//func (b *Swapper) Swap(cmd func() *exec.Cmd, factory Factory, cfg Config) error { +// var ( +// err error +// prev *Pool +// pool *Pool +// ) +// +// prev = b.pool +// if pool, err = NewPool(cmd, factory, cfg); err != nil { +// return err +// } +// +// if prev != nil { +// go func() { +// prev.Close() +// }() +// } +// +// b.mu.Lock() +// b.pool = pool +// b.mu.Unlock() +// +// return nil +//} +// +//// Exec one task with given payload and context, returns result and context +//// or error. Must not be used once pool is being destroyed. +//func (b *Swapper) Exec(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { +// b.mu.Lock() +// pool := b.pool +// b.mu.Unlock() +// +// if pool == nil { +// panic("what") +// } +// +// return pool.Exec(payload, ctx) +//} +// +//// Workers return list of active workers. +//func (b *Swapper) Workers() []*Worker { +// b.mu.Lock() +// pool := b.pool +// b.mu.Unlock() +// +// return pool.Workers() +//} +// +//// Close closes underlying pool. +//func (b *Swapper) Close() { +// b.mu.Lock() +// defer b.mu.Unlock() +// +// b.pool.Close() +// b.pool = nil +//} diff --git a/balancer_test.go b/balancer_test.go new file mode 100644 index 00000000..3f283dce --- /dev/null +++ b/balancer_test.go @@ -0,0 +1 @@ +package roadrunner diff --git a/commands.go b/commands.go deleted file mode 100644 index 5368097f..00000000 --- a/commands.go +++ /dev/null @@ -1,27 +0,0 @@ -package roadrunner - -import ( - "encoding/json" - "github.com/spiral/goridge" -) - -// TerminateCommand must stop underlying process. -type TerminateCommand struct { - Terminate bool `json:"terminate"` -} - -// PidCommand send greeting message between processes in json format. -type PidCommand struct { - Pid int `json:"pid"` - Parent int `json:"parent,omitempty"` -} - -// sends control message via relay using JSON encoding -func sendCommand(rl goridge.Relay, command interface{}) error { - bin, err := json.Marshal(command) - if err != nil { - return err - } - - return rl.Send(bin, goridge.PayloadControl) -} diff --git a/composer.lock b/composer.lock index 7646e668..10a511f7 100644 --- a/composer.lock +++ b/composer.lock @@ -4,21 +4,20 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", "This file is @generated automatically" ], - "hash": "a05da84ea1e9e80a52fc218bb4d952c1", "content-hash": "2deb68e3347a18c68289e209971b087d", "packages": [ { "name": "spiral/goridge", - "version": "v2.0.0", + "version": "v2.0.1", "source": { "type": "git", "url": "https://github.com/spiral/goridge.git", - "reference": "f60182bef09f1e45a47908e1f0fb080affdcab81" + "reference": "24a36898426359ec2a4f645ff8ee06f153ac2f3f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/spiral/goridge/zipball/f60182bef09f1e45a47908e1f0fb080affdcab81", - "reference": "f60182bef09f1e45a47908e1f0fb080affdcab81", + "url": "https://api.github.com/repos/spiral/goridge/zipball/24a36898426359ec2a4f645ff8ee06f153ac2f3f", + "reference": "24a36898426359ec2a4f645ff8ee06f153ac2f3f", "shasum": "" }, "require": { @@ -44,7 +43,7 @@ } ], "description": "High-performance PHP-to-Golang RPC bridge", - "time": "2017-11-17 11:07:26" + "time": "2018-01-23T22:15:20+00:00" } ], "packages-dev": [ @@ -100,7 +99,7 @@ "constructor", "instantiate" ], - "time": "2017-07-22 11:58:36" + "time": "2017-07-22T11:58:36+00:00" }, { "name": "myclabs/deep-copy", @@ -145,7 +144,7 @@ "object", "object graph" ], - "time": "2017-10-19 19:58:43" + "time": "2017-10-19T19:58:43+00:00" }, { "name": "phar-io/manifest", @@ -200,7 +199,7 @@ } ], "description": "Component for reading phar.io manifest information from a PHP Archive (PHAR)", - "time": "2017-03-05 18:14:27" + "time": "2017-03-05T18:14:27+00:00" }, { "name": "phar-io/version", @@ -247,7 +246,7 @@ } ], "description": "Library for handling version information and constraints", - "time": "2017-03-05 17:38:23" + "time": "2017-03-05T17:38:23+00:00" }, { "name": "phpdocumentor/reflection-common", @@ -301,7 +300,7 @@ "reflection", "static analysis" ], - "time": "2017-09-11 18:02:19" + "time": "2017-09-11T18:02:19+00:00" }, { "name": "phpdocumentor/reflection-docblock", @@ -352,7 +351,7 @@ } ], "description": "With this component, a library can provide support for annotations via DocBlocks or otherwise retrieve information that is embedded in a DocBlock.", - "time": "2017-11-27 17:38:31" + "time": "2017-11-27T17:38:31+00:00" }, { "name": "phpdocumentor/type-resolver", @@ -399,7 +398,7 @@ "email": "[email protected]" } ], - "time": "2017-07-14 14:27:02" + "time": "2017-07-14T14:27:02+00:00" }, { "name": "phpspec/prophecy", @@ -462,7 +461,7 @@ "spy", "stub" ], - "time": "2017-11-24 13:59:53" + "time": "2017-11-24T13:59:53+00:00" }, { "name": "phpunit/php-code-coverage", @@ -525,7 +524,7 @@ "testing", "xunit" ], - "time": "2017-12-06 09:29:45" + "time": "2017-12-06T09:29:45+00:00" }, { "name": "phpunit/php-file-iterator", @@ -572,7 +571,7 @@ "filesystem", "iterator" ], - "time": "2017-11-27 13:52:08" + "time": "2017-11-27T13:52:08+00:00" }, { "name": "phpunit/php-text-template", @@ -613,7 +612,7 @@ "keywords": [ "template" ], - "time": "2015-06-21 13:50:34" + "time": "2015-06-21T13:50:34+00:00" }, { "name": "phpunit/php-timer", @@ -662,7 +661,7 @@ "keywords": [ "timer" ], - "time": "2017-02-26 11:10:40" + "time": "2017-02-26T11:10:40+00:00" }, { "name": "phpunit/php-token-stream", @@ -711,7 +710,7 @@ "keywords": [ "tokenizer" ], - "time": "2017-11-27 05:48:46" + "time": "2017-11-27T05:48:46+00:00" }, { "name": "phpunit/phpunit", @@ -795,20 +794,20 @@ "testing", "xunit" ], - "time": "2017-12-17 06:31:19" + "time": "2017-12-17T06:31:19+00:00" }, { "name": "phpunit/phpunit-mock-objects", - "version": "5.0.5", + "version": "5.0.6", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit-mock-objects.git", - "reference": "283b9f4f670e3a6fd6c4ff95c51a952eb5c75933" + "reference": "33fd41a76e746b8fa96d00b49a23dadfa8334cdf" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit-mock-objects/zipball/283b9f4f670e3a6fd6c4ff95c51a952eb5c75933", - "reference": "283b9f4f670e3a6fd6c4ff95c51a952eb5c75933", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit-mock-objects/zipball/33fd41a76e746b8fa96d00b49a23dadfa8334cdf", + "reference": "33fd41a76e746b8fa96d00b49a23dadfa8334cdf", "shasum": "" }, "require": { @@ -854,7 +853,7 @@ "mock", "xunit" ], - "time": "2017-12-10 08:01:53" + "time": "2018-01-06T05:45:45+00:00" }, { "name": "sebastian/code-unit-reverse-lookup", @@ -899,20 +898,20 @@ ], "description": "Looks up which function or method a line of code belongs to", "homepage": "https://github.com/sebastianbergmann/code-unit-reverse-lookup/", - "time": "2017-03-04 06:30:41" + "time": "2017-03-04T06:30:41+00:00" }, { "name": "sebastian/comparator", - "version": "2.1.1", + "version": "2.1.2", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/comparator.git", - "reference": "b11c729f95109b56a0fe9650c6a63a0fcd8c439f" + "reference": "11c07feade1d65453e06df3b3b90171d6d982087" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/comparator/zipball/b11c729f95109b56a0fe9650c6a63a0fcd8c439f", - "reference": "b11c729f95109b56a0fe9650c6a63a0fcd8c439f", + "url": "https://api.github.com/repos/sebastianbergmann/comparator/zipball/11c07feade1d65453e06df3b3b90171d6d982087", + "reference": "11c07feade1d65453e06df3b3b90171d6d982087", "shasum": "" }, "require": { @@ -963,7 +962,7 @@ "compare", "equality" ], - "time": "2017-12-22 14:50:35" + "time": "2018-01-12T06:34:42+00:00" }, { "name": "sebastian/diff", @@ -1015,7 +1014,7 @@ "keywords": [ "diff" ], - "time": "2017-08-03 08:09:46" + "time": "2017-08-03T08:09:46+00:00" }, { "name": "sebastian/environment", @@ -1065,7 +1064,7 @@ "environment", "hhvm" ], - "time": "2017-07-01 08:51:00" + "time": "2017-07-01T08:51:00+00:00" }, { "name": "sebastian/exporter", @@ -1132,7 +1131,7 @@ "export", "exporter" ], - "time": "2017-04-03 13:19:02" + "time": "2017-04-03T13:19:02+00:00" }, { "name": "sebastian/global-state", @@ -1183,7 +1182,7 @@ "keywords": [ "global state" ], - "time": "2017-04-27 15:39:26" + "time": "2017-04-27T15:39:26+00:00" }, { "name": "sebastian/object-enumerator", @@ -1230,7 +1229,7 @@ ], "description": "Traverses array structures and object graphs to enumerate all referenced objects", "homepage": "https://github.com/sebastianbergmann/object-enumerator/", - "time": "2017-08-03 12:35:26" + "time": "2017-08-03T12:35:26+00:00" }, { "name": "sebastian/object-reflector", @@ -1275,7 +1274,7 @@ ], "description": "Allows reflection of object attributes, including inherited and non-public ones", "homepage": "https://github.com/sebastianbergmann/object-reflector/", - "time": "2017-03-29 09:07:27" + "time": "2017-03-29T09:07:27+00:00" }, { "name": "sebastian/recursion-context", @@ -1328,7 +1327,7 @@ ], "description": "Provides functionality to recursively process PHP variables", "homepage": "http://www.github.com/sebastianbergmann/recursion-context", - "time": "2017-03-03 06:23:57" + "time": "2017-03-03T06:23:57+00:00" }, { "name": "sebastian/resource-operations", @@ -1370,7 +1369,7 @@ ], "description": "Provides a list of PHP built-in functions that operate on resources", "homepage": "https://www.github.com/sebastianbergmann/resource-operations", - "time": "2015-07-28 20:34:47" + "time": "2015-07-28T20:34:47+00:00" }, { "name": "sebastian/version", @@ -1413,7 +1412,7 @@ ], "description": "Library that helps with managing the version number of Git-hosted PHP projects", "homepage": "https://github.com/sebastianbergmann/version", - "time": "2016-10-03 07:35:21" + "time": "2016-10-03T07:35:21+00:00" }, { "name": "theseer/tokenizer", @@ -1453,7 +1452,7 @@ } ], "description": "A small library for converting tokenized PHP source code into XML and potentially other formats", - "time": "2017-04-07 12:08:54" + "time": "2017-04-07T12:08:54+00:00" }, { "name": "webmozart/assert", @@ -1503,7 +1502,7 @@ "check", "validate" ], - "time": "2016-11-23 20:04:58" + "time": "2016-11-23T20:04:58+00:00" } ], "aliases": [], @@ -1,19 +1,43 @@ package roadrunner -import "time" +import ( + "fmt" + "time" +) // Config defines basic behaviour of worker creation and handling process. type Config struct { - // MaxWorkers defines how many sub-processes can be run at once. This value might be doubled by Balancer while hot-swap. - MaxWorkers uint64 + // NumWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Swapper while hot-swap. + NumWorkers uint64 - // MaxExecutions defines how many executions is allowed for the worker until it's destruction. Set 1 to create new process - // for each new task, 0 to let worker handle as many tasks as it can. + // MaxExecutions defines how many executions is allowed for the worker until + // it's destruction. set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. MaxExecutions uint64 - // AllocateTimeout defines for how long pool will be waiting for a worker to be freed to handle the task. + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. AllocateTimeout time.Duration - // DestroyOnError when set to true workers will be destructed after any JobError. - DestroyOnError bool + // DestroyTimeout defines for how long pool should be waiting for worker to + // properly stop, if timeout reached worker will be killed. + DestroyTimeout time.Duration +} + +// Valid returns error if config not valid +func (cfg *Config) Valid() error { + if cfg.NumWorkers == 0 { + return fmt.Errorf("config.NumWorkers must be set") + } + + if cfg.AllocateTimeout == 0 { + return fmt.Errorf("config.AllocateTimeout must be set") + } + + if cfg.DestroyTimeout == 0 { + return fmt.Errorf("config.DestroyTimeout must be set") + } + + return nil } diff --git a/config_test.go b/config_test.go new file mode 100644 index 00000000..64bad7cb --- /dev/null +++ b/config_test.go @@ -0,0 +1,40 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func Test_NumWorkers(t *testing.T) { + cfg := Config{ + AllocateTimeout: time.Second, + DestroyTimeout: time.Second * 10, + } + err := cfg.Valid() + + assert.NotNil(t, err) + assert.Equal(t, "config.NumWorkers must be set", err.Error()) +} + +func Test_AllocateTimeout(t *testing.T) { + cfg := Config{ + NumWorkers: 10, + DestroyTimeout: time.Second * 10, + } + err := cfg.Valid() + + assert.NotNil(t, err) + assert.Equal(t, "config.AllocateTimeout must be set", err.Error()) +} + +func Test_DestroyTimeout(t *testing.T) { + cfg := Config{ + NumWorkers: 10, + AllocateTimeout: time.Second, + } + err := cfg.Valid() + + assert.NotNil(t, err) + assert.Equal(t, "config.DestroyTimeout must be set", err.Error()) +} diff --git a/error.go b/error.go deleted file mode 100644 index a8f9e539..00000000 --- a/error.go +++ /dev/null @@ -1,17 +0,0 @@ -package roadrunner - -// WorkerError is communication/process error. -type WorkerError string - -// Error converts error context to string -func (we WorkerError) Error() string { - return string(we) -} - -// JobError is job level error (no worker halt) -type JobError []byte - -// Error converts error context to string -func (je JobError) Error() string { - return string(je) -} diff --git a/error_test.go b/error_test.go deleted file mode 100644 index e8fa62b4..00000000 --- a/error_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package roadrunner - -import ( - "github.com/stretchr/testify/assert" - "testing" -) - -func TestWorkerError_Error(t *testing.T) { - e := WorkerError("error") - assert.Equal(t, "error", e.Error()) -} - -func TestJobError_Error(t *testing.T) { - e := JobError("error") - assert.Equal(t, "error", e.Error()) -} @@ -4,9 +4,7 @@ import "os/exec" // Factory is responsible of wrapping given command into tasks worker. type Factory interface { - // NewWorker creates new worker process based on given process. - NewWorker(cmd *exec.Cmd) (w *Worker, err error) - - // Close closes all open factory descriptors. - Close() error + // SpawnWorker creates new worker process based on given command. + // Process must not be started. + SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) } diff --git a/job_error.go b/job_error.go new file mode 100644 index 00000000..d024ad11 --- /dev/null +++ b/job_error.go @@ -0,0 +1,10 @@ +package roadrunner + +// JobError is job level error (no worker halt), wraps at top +// of error context +type JobError []byte + +// Error converts error context to string +func (je JobError) Error() string { + return string(je) +} diff --git a/job_error_test.go b/job_error_test.go new file mode 100644 index 00000000..9b0fa53e --- /dev/null +++ b/job_error_test.go @@ -0,0 +1,11 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_JobError_Error(t *testing.T) { + e := JobError([]byte("error")) + assert.Equal(t, "error", e.Error()) +} diff --git a/payload.go b/payload.go new file mode 100644 index 00000000..63a709dc --- /dev/null +++ b/payload.go @@ -0,0 +1,14 @@ +package roadrunner + +type Payload struct { + Head, Body []byte +} + +func (p *Payload) HeadString() { + +} + +// String returns payload body as string +func (p *Payload) String() string { + return string(p.Body) +} diff --git a/payload_test.go b/payload_test.go new file mode 100644 index 00000000..3f283dce --- /dev/null +++ b/payload_test.go @@ -0,0 +1 @@ +package roadrunner diff --git a/pipe_factory.go b/pipe_factory.go index ce32dacc..30c34139 100644 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -2,45 +2,59 @@ package roadrunner import ( "github.com/spiral/goridge" + "io" "os/exec" + "github.com/pkg/errors" ) -// PipeFactory connects to workers using standard streams (STDIN, STDOUT pipes). +// PipeFactory connects to workers using standard +// streams (STDIN, STDOUT pipes). type PipeFactory struct { } -// NewPipeFactory returns new factory instance and starts listening +// NewPipeFactory returns new factory instance and starts +// listening func NewPipeFactory() *PipeFactory { return &PipeFactory{} } -// NewWorker creates worker and connects it to appropriate relay or returns error -func (f *PipeFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) { - w, err = NewWorker(cmd) - if err != nil { +// SpawnWorker creates new worker and connects it to goridge relay, +// method Wait() must be handled on level above. +func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { + if w, err = newWorker(cmd); err != nil { return nil, err } - in, err := cmd.StdoutPipe() - if err != nil { + var ( + in io.ReadCloser + out io.WriteCloser + ) + + if in, err = cmd.StdoutPipe(); err != nil { return nil, err } - out, err := cmd.StdinPipe() - if err != nil { + if out, err = cmd.StdinPipe(); err != nil { return nil, err } + w.rl = goridge.NewPipeRelay(in, out) + if err := w.Start(); err != nil { - return nil, err + return nil, errors.Wrap(err, "process error") } - w.attach(goridge.NewPipeRelay(in, out)) + // todo: timeout ? + if pid, err := fetchPID(w.rl); pid != *w.Pid { + go func(w *Worker) { w.Kill() }(w) - return w, nil -} + if wErr := w.Wait(); wErr != nil { + err = errors.Wrap(wErr, err.Error()) + } + + return nil, errors.Wrap(err, "unable to connect to worker") + } -// Close closes all open factory descriptors. -func (f *PipeFactory) Close() error { - return nil + w.state.set(StateReady) + return w, nil } diff --git a/pipe_factory_test.go b/pipe_factory_test.go new file mode 100644 index 00000000..97e94487 --- /dev/null +++ b/pipe_factory_test.go @@ -0,0 +1,107 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "os/exec" + "testing" +) + +func Test_Pipe_Start(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + w.Stop() +} + +func Test_Pipe_Failboot(t *testing.T) { + cmd := exec.Command("php", "tests/failboot.php") + + w, err := NewPipeFactory().SpawnWorker(cmd) + + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Pipe_Invalid(t *testing.T) { + cmd := exec.Command("php", "tests/invalid.php") + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer w.Stop() + + res, err := w.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Head) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + err := w.Wait() + + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + defer w.Stop() + + res, err := w.Exec(&Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) +} + +func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { + f := NewPipeFactory() + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorker(cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + w.Stop() + } +} + +func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + w.Wait() + }() + defer w.Stop() + + for n := 0; n < b.N; n++ { + if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} @@ -2,103 +2,89 @@ package roadrunner import ( "fmt" + "log" "os/exec" "sync" - "sync/atomic" "time" + "github.com/pkg/errors" ) const ( - // ContextTerminate must be sent by worker in control payload if worker want to die. - ContextTerminate = "TERMINATE" + // Control header to be made by worker to request termination. + TerminateRequest = "{\"terminate\": true}" ) // Pool controls worker creation, destruction and task routing. type Pool struct { - cfg Config // pool behaviour - cmd func() *exec.Cmd // worker command creator - factory Factory // creates and connects to workers - numWorkers uint64 // current number of tasks workers - tasks sync.WaitGroup // counts all tasks executions - mua sync.Mutex // protects worker allocation - muw sync.RWMutex // protects state of worker list - workers []*Worker // all registered workers - free chan *Worker // freed workers + // pool behaviour + cfg Config + + // worker command creator + cmd func() *exec.Cmd + + // creates and connects to workers + factory Factory + + // active task executions + tasks sync.WaitGroup + + // workers circular allocation buffer + free chan *Worker + + // protects state of worker list, does not affect allocation + muw sync.RWMutex + + // all registered workers + workers []*Worker } // NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker. func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) { + if err := cfg.Valid(); err != nil { + return nil, errors.Wrap(err, "config error") + } + p := &Pool{ cfg: cfg, cmd: cmd, factory: factory, - workers: make([]*Worker, 0, cfg.MaxWorkers), - free: make(chan *Worker, cfg.MaxWorkers), + workers: make([]*Worker, 0, cfg.NumWorkers), + free: make(chan *Worker, cfg.NumWorkers), } - // to test if worker ready - w, err := p.createWorker() - if err != nil { - return nil, err - } + //todo: watch for error from workers!!! - p.free <- w - return p, nil -} + // constant number of workers simplify logic + for i := uint64(0); i < p.cfg.NumWorkers; i++ { + // to test if worker ready + w, err := p.createWorker() -// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is -// being destroyed. -func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { - p.tasks.Add(1) - defer p.tasks.Done() + if err != nil { + p.Destroy() - w, err := p.allocateWorker() - if err != nil { - return nil, nil, err - } - - if resp, rCtx, err = w.Execute(payload, ctx); err != nil { - if !p.cfg.DestroyOnError { - if err, jobError := err.(JobError); jobError { - p.free <- w - return nil, nil, err - } + return nil, err } - // worker level error - p.destroyWorker(w) - - return nil, nil, err - } - - // controlled destruction - if len(resp) == 0 && string(rCtx) == ContextTerminate { - p.destroyWorker(w) - go func() { - //immediate refill - if w, err := p.createWorker(); err != nil { - p.free <- w + // worker watcher + go func(w *Worker) { + if err := w.Wait(); err != nil { + // todo: register error + log.Println(err) } - }() - - return p.Execute(payload, ctx) - } + }(w) - if p.cfg.MaxExecutions != 0 && atomic.LoadUint64(&w.NumExecutions) > p.cfg.MaxExecutions { - p.destroyWorker(w) - } else { p.free <- w } - return resp, rCtx, nil + return p, nil } -// Config returns associated pool configuration. +// Config returns associated pool configuration. Immutable. func (p *Pool) Config() Config { return p.cfg } -// Workers returns workers associated with the pool. +// Workers returns worker list associated with the pool. func (p *Pool) Workers() (workers []*Worker) { p.muw.RLock() defer p.muw.RUnlock() @@ -110,8 +96,46 @@ func (p *Pool) Workers() (workers []*Worker) { return workers } -// Close all underlying workers (but let them to complete the task). -func (p *Pool) Close() { +// Exec one task with given payload and context, returns result or error. +func (p *Pool) Exec(rqs *Payload) (rsp *Payload, err error) { + p.tasks.Add(1) + defer p.tasks.Done() + + w, err := p.allocateWorker() + if err != nil { + return nil, errors.Wrap(err, "unable to allocate worker") + } + + rsp, err = w.Exec(rqs) + + if err != nil { + // soft job errors are allowed + if _, jobError := err.(JobError); jobError { + p.free <- w + return nil, err + } + + go p.replaceWorker(w, err) + return nil, err + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Head != nil && string(rsp.Head) == TerminateRequest { + go p.replaceWorker(w, err) + return p.Exec(rqs) + } + + if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { + go p.replaceWorker(w, p.cfg.MaxExecutions) + } else { + p.free <- w + } + + return rsp, nil +} + +// Destroy all underlying workers (but let them to complete the task). +func (p *Pool) Destroy() { p.tasks.Wait() var wg sync.WaitGroup @@ -119,6 +143,7 @@ func (p *Pool) Close() { wg.Add(1) go func(w *Worker) { defer wg.Done() + p.destroyWorker(w) }(w) } @@ -127,63 +152,69 @@ func (p *Pool) Close() { } // finds free worker in a given time interval or creates new if allowed. -func (p *Pool) allocateWorker() (*Worker, error) { - p.mua.Lock() - defer p.mua.Unlock() - +func (p *Pool) allocateWorker() (w *Worker, err error) { select { - case w := <-p.free: - // we already have free worker + case w = <-p.free: return w, nil default: - if p.numWorkers < p.cfg.MaxWorkers { - return p.createWorker() - } + // enable timeout handler + } - timeout := time.NewTimer(p.cfg.AllocateTimeout) - select { - case <-timeout.C: - return nil, fmt.Errorf("unable to allocate worker, timeout (%s)", p.cfg.AllocateTimeout) - case w := <-p.free: - timeout.Stop() - return w, nil - } + timeout := time.NewTimer(p.cfg.AllocateTimeout) + select { + case <-timeout.C: + return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) + case w := <-p.free: + timeout.Stop() + return w, nil } } +// replaces dead or expired worker with new instance +func (p *Pool) replaceWorker(w *Worker, caused interface{}) { + go p.destroyWorker(w) + + nw, _ := p.createWorker() + p.free <- nw +} + // destroy and remove worker from the pool. func (p *Pool) destroyWorker(w *Worker) { - atomic.AddUint64(&p.numWorkers, ^uint64(0)) - - go func() { - w.Stop() + // detaching + p.muw.Lock() + for i, wc := range p.workers { + if wc == w { + p.workers = p.workers[:i+1] + break + } + } + p.muw.Unlock() - p.muw.Lock() - defer p.muw.Unlock() + go w.Stop() - for i, wc := range p.workers { - if wc == w { - p.workers = p.workers[:i+1] - break - } + select { + case <-w.waitDone: + // worker is dead + case <-time.NewTimer(time.Second * 10).C: + // failed to stop process + if err := w.Kill(); err != nil { + //todo: can't kill or already killed? } - }() + } } -// creates new worker (must be called in a locked state). +// creates new worker using associated factory. automatically +// adds worker to the worker list (background) func (p *Pool) createWorker() (*Worker, error) { - w, err := p.factory.NewWorker(p.cmd()) + w, err := p.factory.SpawnWorker(p.cmd()) if err != nil { return nil, err } - atomic.AddUint64(&p.numWorkers, 1) + p.muw.Lock() + defer p.muw.Unlock() - go func() { - p.muw.Lock() - defer p.muw.Unlock() - p.workers = append(p.workers, w) - }() + p.workers = append(p.workers, w) return w, nil } diff --git a/pool_test.go b/pool_test.go new file mode 100644 index 00000000..92a594df --- /dev/null +++ b/pool_test.go @@ -0,0 +1,208 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "os/exec" + "runtime" + "sync" + "testing" + "time" + "strconv" +) + +var cfg = Config{ + NumWorkers: uint64(runtime.NumCPU()), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, +} + +func Test_NewPool(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) +} + +func Test_ConfigError(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + Config{ + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.Nil(t, p) + assert.Error(t, err) +} + +func Test_Pool_Echo(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + res, err := p.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Head) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pool_AllocateTimeout(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, + NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Millisecond * 50, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + done := make(chan interface{}) + go func() { + _, err := p.Exec(&Payload{Body: []byte("100")}) + assert.NoError(t, err) + close(done) + }() + + // to ensure that worker is already busy + time.Sleep(time.Millisecond * 10) + + _, err = p.Exec(&Payload{Body: []byte("10")}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "worker timeout") + + <-done + p.Destroy() +} + +//todo: termiante + +func Test_Pool_Replace_Worker(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, + NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxExecutions: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + var lastPID string + lastPID = strconv.Itoa(*p.Workers()[0].Pid) + + res, err := p.Exec(&Payload{Body: []byte("hello")}) + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Head) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +func Benchmark_Pool_Allocate(b *testing.B) { + p, _ := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + for n := 0; n < b.N; n++ { + w, err := p.allocateWorker() + if err != nil { + b.Fail() + } + + p.free <- w + } +} + +func Benchmark_Pool_Echo(b *testing.B) { + p, _ := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + for n := 0; n < b.N; n++ { + if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pool_Echo_Batched(b *testing.B) { + p, _ := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + }() + } + + wg.Wait() +} + +func Benchmark_Pool_Echo_Replaced(b *testing.B) { + p, _ := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxExecutions: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + defer p.Destroy() + + for n := 0; n < b.N; n++ { + if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} diff --git a/protocol.go b/protocol.go new file mode 100644 index 00000000..b78f2807 --- /dev/null +++ b/protocol.go @@ -0,0 +1,52 @@ +package roadrunner + +import ( + "encoding/json" + "fmt" + "github.com/spiral/goridge" + "os" +) + +type stopCommand struct { + Stop bool `json:"stop"` +} + +type pidCommand struct { + Pid int `json:"pid"` +} + +func sendHead(rl goridge.Relay, v interface{}) error { + if v == nil { + rl.Send(nil, goridge.PayloadControl) + } + + if data, ok := v.([]byte); ok { + return rl.Send(data, goridge.PayloadControl) + } + + data, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("invalid payload: %s", err) + } + + return rl.Send(data, goridge.PayloadControl) +} + +func fetchPID(rl goridge.Relay) (pid int, err error) { + if err := sendHead(rl, pidCommand{Pid: os.Getpid()}); err != nil { + return 0, err + } + + body, p, err := rl.Receive() + if !p.HasFlag(goridge.PayloadControl) { + return 0, fmt.Errorf("unexpected response, header is missing") + } + + link := &pidCommand{} + //log.Println(string(body)) + if err := json.Unmarshal(body, link); err != nil { + return 0, err + } + + return link.Pid, nil +} diff --git a/socket_factory.go b/socket_factory.go index 5d1b488b..b915973b 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -1,69 +1,72 @@ package roadrunner import ( - "encoding/json" "fmt" "github.com/spiral/goridge" "net" - "os" "os/exec" "sync" "time" + "github.com/pkg/errors" ) // SocketFactory connects to external workers using socket server. type SocketFactory struct { - ls net.Listener // listens for incoming connections from underlying processes - tout time.Duration // connection timeout - mu sync.Mutex // protects socket mapping - wait map[int]chan *goridge.SocketRelay // sockets which are waiting for process association + // listens for incoming connections from underlying processes + ls net.Listener + + // relay connection timeout + tout time.Duration + + // protects socket mapping + mu sync.Mutex + + // sockets which are waiting for process association + relays map[int]chan *goridge.SocketRelay } -// NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory -// should wait for incoming relay connection +// NewSocketFactory returns SocketFactory attached to a given socket listener. +// tout specifies for how long factory should serve for incoming relay connection func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory { f := &SocketFactory{ - ls: ls, - tout: tout, - wait: make(map[int]chan *goridge.SocketRelay), + ls: ls, + tout: tout, + relays: make(map[int]chan *goridge.SocketRelay), } go f.listen() + return f } -// NewWorker creates worker and connects it to appropriate relay or returns error -func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) { - w, err = NewWorker(cmd) - if err != nil { - return nil, err +// SpawnWorker creates worker and connects it to appropriate relay or returns error +func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, workerError error) { + if w, workerError = newWorker(cmd); workerError != nil { + return nil, workerError } if err := w.Start(); err != nil { - return nil, err - } - - w.Pid = &w.cmd.Process.Pid - if w.Pid == nil { - return nil, fmt.Errorf("can't to start worker %s", w) + return nil, errors.Wrap(err, "process error") } - rl, err := f.waitRelay(*w.Pid, f.tout) + rl, err := f.findRelay(w, f.tout) if err != nil { - return nil, fmt.Errorf("can't connect to worker %s: %s", w, err) + go func(w *Worker) { w.Kill() }(w) + + if wErr := w.Wait(); wErr != nil { + err = errors.Wrap(wErr, err.Error()) + } + + return nil, errors.Wrap(err, "unable to connect to worker") } - w.attach(rl) + w.rl = rl + w.state.set(StateReady) return w, nil } -// Close closes all open factory descriptors. -func (f *SocketFactory) Close() error { - return f.ls.Close() -} - -// listen for incoming wait and associate sockets with active workers +// listens for incoming socket connections func (f *SocketFactory) listen() { for { conn, err := f.ls.Accept() @@ -79,16 +82,23 @@ func (f *SocketFactory) listen() { } // waits for worker to connect over socket and returns associated relay of timeout -func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketRelay, error) { +func (f *SocketFactory) findRelay(w *Worker, tout time.Duration) (*goridge.SocketRelay, error) { timer := time.NewTimer(tout) - select { - case rl := <-f.relayChan(pid): - timer.Stop() - f.cleanChan(pid) - - return rl, nil - case <-timer.C: - return nil, fmt.Errorf("relay timer for [%v]", pid) + for { + select { + case rl := <-f.relayChan(*w.Pid): + timer.Stop() + f.cleanChan(*w.Pid) + return rl, nil + + case <-timer.C: + return nil, fmt.Errorf("relay timeout") + + case <-w.waitDone: + timer.Stop() + f.cleanChan(*w.Pid) + return nil, fmt.Errorf("worker is gone") + } } } @@ -97,10 +107,10 @@ func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay { f.mu.Lock() defer f.mu.Unlock() - rl, ok := f.wait[pid] + rl, ok := f.relays[pid] if !ok { - f.wait[pid] = make(chan *goridge.SocketRelay) - return f.wait[pid] + f.relays[pid] = make(chan *goridge.SocketRelay) + return f.relays[pid] } return rl @@ -111,28 +121,5 @@ func (f *SocketFactory) cleanChan(pid int) { f.mu.Lock() defer f.mu.Unlock() - delete(f.wait, pid) -} - -// send control command to relay and return associated Pid (or error) -func fetchPID(rl goridge.Relay) (pid int, err error) { - if err := sendCommand(rl, PidCommand{Pid: os.Getpid()}); err != nil { - return 0, err - } - - body, p, err := rl.Receive() - if !p.HasFlag(goridge.PayloadControl) { - return 0, fmt.Errorf("unexpected response, `control` header is missing") - } - - link := &PidCommand{} - if err := json.Unmarshal(body, link); err != nil { - return 0, err - } - - if link.Parent != os.Getpid() { - return 0, fmt.Errorf("integrity error, parent process does not match") - } - - return link.Pid, nil + delete(f.relays, pid) } diff --git a/socket_factory_test.go b/socket_factory_test.go new file mode 100644 index 00000000..c64c607f --- /dev/null +++ b/socket_factory_test.go @@ -0,0 +1,274 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "net" + "os/exec" + "runtime" + "testing" + "time" +) + +func Test_Tcp_Start(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer ls.Close() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + + w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + w.Stop() +} + +func Test_Tcp_Failboot(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer ls.Close() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/failboot.php") + + w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Tcp_Timeout(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer ls.Close() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/slow-client.php", "echo", "tcp", "200", "0") + + w, err := NewSocketFactory(ls, time.Millisecond*100).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "relay timeout") +} + +func Test_Tcp_Invalid(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer ls.Close() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/invalid.php") + + w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Broken(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer ls.Close() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/client.php", "broken", "tcp") + + w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + go func() { + err := w.Wait() + + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + defer w.Stop() + + res, err := w.Exec(&Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) +} + +func Test_Tcp_Echo(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer ls.Close() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + + w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer w.Stop() + + res, err := w.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Head) + + assert.Equal(t, "hello", res.String()) +} + +//todo: test relay timeout +//todo: test dead workers + +//func Test_Tcp_Errored(t *testing.T) { +// defer time.Sleep(time.Millisecond * 10) // free socket +// +// ls, err := net.Listen("tcp", "localhost:9007") +// if assert.NoError(t, err) { +// defer ls.Destroy() +// } else { +// t.Skip("socket is busy") +// } +// +// cmd := exec.Command("php", "tests/invalid.php") +// w, err := NewSocketFactory(ls).SpawnWorker(cmd) +// assert.Nil(t, w) +// assert.NotNil(t, err) +// +// assert.Equal(t, "unable to connect to worker: worker is gone", err.Error()) +//} +// +//func Test_Tcp_Broken(t *testing.T) { +// defer time.Sleep(time.Millisecond * 10) // free socket +// +// ls, err := net.Listen("tcp", "localhost:9007") +// if assert.NoError(t, err) { +// defer ls.Destroy() +// } else { +// t.Skip("socket is busy") +// } +// +// cmd := exec.Command("php", "tests/client.php", "broken", "tcp") +// w, err := NewSocketFactory(ls).SpawnWorker(cmd) +// defer w.Destroy() +// +// r, ctx, err := w.Exec([]byte("hello"), nil) +// assert.Nil(t, r) +// assert.NotNil(t, err) +// assert.Nil(t, ctx) +// +// assert.IsType(t, WorkerError(errors.New("")), err) +// assert.Contains(t, err.Error(), "undefined_function()") +//} +// + +func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { + ls, err := net.Listen("tcp", "localhost:9007") + if err == nil { + defer ls.Close() + } else { + b.Skip("socket is busy") + } + + f := NewSocketFactory(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + + w, _ := f.SpawnWorker(cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + w.Stop() + } +} + +func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { + ls, err := net.Listen("tcp", "localhost:9007") + if err == nil { + defer ls.Close() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + + w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + go func() { + w.Wait() + }() + defer w.Stop() + + for n := 0; n < b.N; n++ { + if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { + if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { + b.Skip("not supported on " + runtime.GOOS) + } + + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer ls.Close() + } else { + b.Skip("socket is busy") + } + + f := NewSocketFactory(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "tests/client.php", "echo", "unix") + + w, _ := f.SpawnWorker(cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + w.Stop() + } +} + +func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { + if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { + b.Skip("not supported on " + runtime.GOOS) + } + + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer ls.Close() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/client.php", "echo", "unix") + + w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + go func() { + w.Wait() + }() + defer w.Stop() + + for n := 0; n < b.N; n++ { + if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} diff --git a/source/Worker.php b/source/Worker.php index d31b48bd..57e8fbf5 100644 --- a/source/Worker.php +++ b/source/Worker.php @@ -87,6 +87,7 @@ class Worker $this->relay->send($context, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW); } + //todo: null payload? $this->relay->send($payload, Relay::PAYLOAD_RAW); } @@ -136,26 +137,23 @@ class Worker return true; } - $parsed = json_decode($body, true); - if ($parsed === false) { + $p = json_decode($body, true); + if ($p === false) { throw new RoadRunnerException("invalid task context, JSON payload is expected"); } // PID negotiation (socket connections only) - if (!empty($parsed['pid'])) { - $this->relay->send(json_encode([ - 'pid' => getmypid(), - 'parent' => $parsed['pid'], - ]), Relay::PAYLOAD_CONTROL); + if (!empty($p['pid'])) { + $this->relay->send(sprintf('{"pid":%s}', getmypid()), Relay::PAYLOAD_CONTROL); } // termination request - if (!empty($parsed['terminate'])) { + if (!empty($p['stop'])) { return false; } // not a command but execution context - $context = $parsed; + $context = $p; return true; } @@ -1,39 +1,101 @@ package roadrunner -// State is current state int. -type State int +import ( + "sync" + "sync/atomic" + "time" +) + +// State represents worker status and updated time. +type State interface { + // Value returns state value + Value() int64 //todo: change to state value + + // NumExecs shows how many times worker was invoked + NumExecs() uint64 + + // Updated indicates a moment updated last state change + Updated() time.Time +} const ( // StateInactive - no associated process - StateInactive State = iota - // StateBooting - relay attached but w.Start() not executed - StateBooting + StateInactive int64 = iota // StateReady - ready for job. StateReady // StateWorking - working on given payload. StateWorking // StateStopped - process has been terminated StateStopped - // StateError - error State (can't be used) - StateError + // StateErrored - error state (can't be used) + StateErrored ) +type state struct { + mu sync.RWMutex + value int64 + numExecs uint64 + updated time.Time +} + +func newState(value int64) *state { + return &state{value: value, updated: time.Now()} +} + // String returns current state as string. -func (s State) String() string { - switch s { +func (s *state) String() string { + switch s.value { case StateInactive: return "inactive" - case StateBooting: - return "booting" case StateReady: return "ready" case StateWorking: return "working" case StateStopped: return "stopped" - case StateError: - return "error" + case StateErrored: + return "errored" } return "undefined" } + +// Value state returns state value +func (s *state) Value() int64 { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.value +} + +// IsActive returns true if worker not Inactive or Stopped +func (s *state) IsActive() bool { + state := s.Value() + return state == StateWorking || state == StateReady +} + +// Updated indicates a moment updated last state change +func (s *state) Updated() time.Time { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.updated +} + +func (s *state) NumExecs() uint64 { + return atomic.LoadUint64(&s.numExecs) +} + +// change state value (status) +func (s *state) set(value int64) { + s.mu.Lock() + defer s.mu.Unlock() + + s.value = value + s.updated = time.Now() +} + +// register new execution atomically +func (s *state) registerExec() { + atomic.AddUint64(&s.numExecs, 1) +} diff --git a/state_test.go b/state_test.go new file mode 100644 index 00000000..965ba6b6 --- /dev/null +++ b/state_test.go @@ -0,0 +1,21 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_NewState(t *testing.T) { + st := newState(StateErrored) + + assert.Equal(t, "errored", st.String()) + assert.NotEqual(t, 0, st.Updated().Unix()) +} + +func Test_IsActive(t *testing.T) { + assert.False(t, newState(StateInactive).IsActive()) + assert.True(t, newState(StateReady).IsActive()) + assert.True(t, newState(StateWorking).IsActive()) + assert.False(t, newState(StateStopped).IsActive()) + assert.False(t, newState(StateErrored).IsActive()) +} diff --git a/tests/broken-client.php b/tests/broken-client.php deleted file mode 100644 index ed5bde20..00000000 --- a/tests/broken-client.php +++ /dev/null @@ -1,17 +0,0 @@ -<?php - -use Spiral\Goridge; -use Spiral\RoadRunner; - -/** - * echo client over pipes. - */ -ini_set('display_errors', 'stderr'); -require "vendor/autoload.php"; - -$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); - -while ($in = $rr->receive($ctx)) { - echo undefined_function(); - $rr->send((string)$in); -}
\ No newline at end of file diff --git a/tests/broken.php b/tests/broken.php new file mode 100644 index 00000000..b1a3839e --- /dev/null +++ b/tests/broken.php @@ -0,0 +1,14 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + echo undefined_function(); + $rr->send((string)$in); +}
\ No newline at end of file diff --git a/tests/client.php b/tests/client.php new file mode 100644 index 00000000..1f1d21b1 --- /dev/null +++ b/tests/client.php @@ -0,0 +1,36 @@ +<?php + +use Spiral\Goridge; + +ini_set('display_errors', 'stderr'); +require dirname(__DIR__) . "/vendor/autoload.php"; + +if (count($argv) < 3) { + die("need 2 arguments"); +} + +list($test, $goridge) = [$argv[1], $argv[2]]; + +switch ($goridge) { + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; + + case "tcp": + $relay = new Goridge\SocketRelay("localhost", 9007); + break; + + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketRelay::SOCK_UNIX + ); + + break; + + default: + die("invalid protocol selection"); +} + +require_once sprintf("%s/%s.php", __DIR__, $test);
\ No newline at end of file diff --git a/tests/delay.php b/tests/delay.php new file mode 100644 index 00000000..bfde2fc4 --- /dev/null +++ b/tests/delay.php @@ -0,0 +1,18 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + try { + usleep($in * 1000); + $rr->send(''); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +}
\ No newline at end of file diff --git a/tests/echo-client.php b/tests/echo.php index 22761862..ba58ff30 100644 --- a/tests/echo-client.php +++ b/tests/echo.php @@ -1,15 +1,12 @@ <?php +/** + * @var Goridge\RelayInterface $relay + */ use Spiral\Goridge; use Spiral\RoadRunner; -/** - * echo client over pipes. - */ -ini_set('display_errors', 'stderr'); -require "vendor/autoload.php"; - -$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$rr = new RoadRunner\Worker($relay); while ($in = $rr->receive($ctx)) { try { diff --git a/tests/error-client.php b/tests/error-client.php deleted file mode 100644 index 113d1197..00000000 --- a/tests/error-client.php +++ /dev/null @@ -1,16 +0,0 @@ -<?php - -use Spiral\Goridge; -use Spiral\RoadRunner; - -/** - * echo client over pipes. - */ -ini_set('display_errors', 'stderr'); -require "vendor/autoload.php"; - -$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); - -while ($in = $rr->receive($ctx)) { - $rr->error((string)$in); -}
\ No newline at end of file diff --git a/tests/error.php b/tests/error.php new file mode 100644 index 00000000..ebd3418b --- /dev/null +++ b/tests/error.php @@ -0,0 +1,13 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + $rr->error((string)$in); +}
\ No newline at end of file diff --git a/tests/failboot.php b/tests/failboot.php new file mode 100644 index 00000000..fa8b96f6 --- /dev/null +++ b/tests/failboot.php @@ -0,0 +1,3 @@ +<?php +ini_set('display_errors', 'stderr'); +throw new Error("failboot error");
\ No newline at end of file diff --git a/tests/pid.php b/tests/pid.php new file mode 100644 index 00000000..a8cfa229 --- /dev/null +++ b/tests/pid.php @@ -0,0 +1,17 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + try { + $rr->send((string)getmypid()); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +}
\ No newline at end of file diff --git a/tests/slow-client.php b/tests/slow-client.php new file mode 100644 index 00000000..f09142b5 --- /dev/null +++ b/tests/slow-client.php @@ -0,0 +1,38 @@ +<?php + +use Spiral\Goridge; + +ini_set('display_errors', 'stderr'); +require dirname(__DIR__) . "/vendor/autoload.php"; + +if (count($argv) < 3) { + die("need 2 arguments"); +} + +list($test, $goridge, $bootDelay, $shutdownDelay) = [$argv[1], $argv[2], $argv[3], $argv[4]]; + +switch ($goridge) { + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; + + case "tcp": + $relay = new Goridge\SocketRelay("localhost", 9007); + break; + + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketRelay::SOCK_UNIX + ); + + break; + + default: + die("invalid protocol selection"); +} + +usleep($bootDelay * 1000); +require_once sprintf("%s/%s.php", __DIR__, $test); +usleep($shutdownDelay * 1000);
\ No newline at end of file @@ -2,167 +2,233 @@ package roadrunner import ( "bytes" - "encoding/json" "fmt" "github.com/spiral/goridge" - "io" + "os" "os/exec" "strconv" "strings" "sync" - "sync/atomic" - "time" + "github.com/pkg/errors" ) // Worker - supervised process with api over goridge.Relay. type Worker struct { - // State current worker state. - State State + // Pid of the process, points to Pid of underlying process and + // can be nil while process is not started. + Pid *int - // Last time worker State has changed - Last time.Time + // state holds information about current worker state, + // number of worker executions, last status change time. + // publicly this object is read-only and protected using Mutex + // and atomic counter. + state *state - // NumExecutions how many times worker have been invoked. - NumExecutions uint64 + // underlying command with associated process, command must be + // provided to worker from outside in non-started form. Cmd + // stdErr pipe will be handled by worker to aggregate error message. + cmd *exec.Cmd - // Pid contains process ID and empty until worker is started. - Pid *int + // err aggregates stderr output from underlying process. Value can be + // read only once command is completed and all pipes are closed. + err *bytes.Buffer + + // channel is being closed once command is complete. + waitDone chan interface{} + + // contains information about resulted process state. + endState *os.ProcessState - cmd *exec.Cmd // underlying command process - err *bytes.Buffer // aggregates stderr - rl goridge.Relay // communication bus with underlying process - mu sync.RWMutex // ensures than only one execution can be run at once + // ensures than only one execution can be run at once. + mu sync.Mutex + + // communication bus with underlying process. + rl goridge.Relay } -// NewWorker creates new worker -func NewWorker(cmd *exec.Cmd) (*Worker, error) { - w := &Worker{ - cmd: cmd, - err: bytes.NewBuffer(nil), - State: StateInactive, +// newWorker creates new worker over given exec.cmd. +func newWorker(cmd *exec.Cmd) (*Worker, error) { + if cmd.Process != nil { + return nil, fmt.Errorf("can't attach to running process") } - if w.cmd.Process != nil { - return nil, fmt.Errorf("can't attach to running process") + w := &Worker{ + cmd: cmd, + err: new(bytes.Buffer), + waitDone: make(chan interface{}), + state: newState(StateInactive), } + // piping all stderr to command buffer + w.cmd.Stderr = w.err + return w, nil } +// State return read-only worker state object, state can be used to safely access +// worker status, time when status changed and number of worker executions. +func (w *Worker) State() State { + return w.state +} + // String returns worker description. func (w *Worker) String() string { - state := w.State.String() - + state := w.state.String() if w.Pid != nil { - state = state + ", pid:" + strconv.Itoa(*w.Pid) + state = state + ", pid.php:" + strconv.Itoa(*w.Pid) } - return fmt.Sprintf("(`%s` [%s], execs: %v)", strings.Join(w.cmd.Args, " "), state, w.NumExecutions) + return fmt.Sprintf( + "(`%s` [%s], numExecs: %v)", + strings.Join(w.cmd.Args, " "), + state, + w.state.NumExecs(), + ) } // Start underlying process or return error func (w *Worker) Start() error { - stderr, err := w.cmd.StderrPipe() - if err != nil { - w.setState(StateError) - return err + if w.cmd.Process != nil { + return fmt.Errorf("process already running") } - // copying all process errors into buffer space - go io.Copy(w.err, stderr) - if err := w.cmd.Start(); err != nil { - w.setState(StateError) - return w.mockError(err) + close(w.waitDone) + + return err } - w.setState(StateReady) + w.Pid = &w.cmd.Process.Pid + + // relays for process to complete + go func() { + w.endState, _ = w.cmd.Process.Wait() + if w.waitDone != nil { + w.state.set(StateStopped) + close(w.waitDone) + + if w.rl != nil { + w.mu.Lock() + defer w.mu.Unlock() + + w.rl.Close() + } + } + }() return nil } -// Execute command and return result and result context. -func (w *Worker) Execute(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { - w.mu.Lock() - defer w.mu.Unlock() +// Wait must be called once for each worker, call will be released once worker is +// complete and will return process error (if any), if stderr is presented it's value +// will be wrapped as WorkerError. Method will return error code if php process fails +// to find or start the script. +func (w *Worker) Wait() error { + <-w.waitDone - if w.State != StateReady { - return nil, nil, fmt.Errorf("worker must be in state `waiting` (`%s` given)", w.State) - } + // ensure that all pipe descriptors are closed + w.cmd.Wait() - w.setState(StateReady) - atomic.AddUint64(&w.NumExecutions, 1) + if w.endState.Success() { + return nil + } - if ctx != nil { - if data, err := json.Marshal(ctx); err == nil { - w.rl.Send(data, goridge.PayloadControl) - } else { - return nil, nil, fmt.Errorf("invalid context: %s", err) - } - } else { - w.rl.Send(nil, goridge.PayloadControl|goridge.PayloadEmpty) + if w.err.Len() != 0 { + return errors.New(w.err.String()) } - w.rl.Send(body, 0) + // generic process error + return &exec.ExitError{ProcessState: w.endState} +} - rCtx, p, err := w.rl.Receive() +// Destroy sends soft termination command to the worker to properly stop the process. +func (w *Worker) Stop() error { + select { + case <-w.waitDone: + return nil + default: + w.mu.Lock() + defer w.mu.Unlock() - if !p.HasFlag(goridge.PayloadControl) { - return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)")) - } + w.state.set(StateInactive) + err := sendHead(w.rl, &stopCommand{Stop: true}) - if p.HasFlag(goridge.PayloadError) { - w.setState(StateReady) - return nil, nil, JobError(rCtx) + <-w.waitDone + return err } +} - if resp, p, err = w.rl.Receive(); err != nil { - w.setState(StateError) - return nil, nil, w.mockError(fmt.Errorf("worker error: %s", err)) - } +// Kill kills underlying process, make sure to call Wait() func to gather +// error log from the stderr +func (w *Worker) Kill() error { + select { + case <-w.waitDone: + return nil + default: + w.mu.Lock() + defer w.mu.Unlock() - w.setState(StateReady) - return resp, rCtx, nil + w.state.set(StateInactive) + err := w.cmd.Process.Kill() + + <-w.waitDone + return err + } } -// Stop underlying process or return error. -func (w *Worker) Stop() { +func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { w.mu.Lock() defer w.mu.Unlock() - w.setState(StateInactive) + if rqs == nil { + return nil, fmt.Errorf("payload can not be empty") + } - go func() { - sendCommand(w.rl, &TerminateCommand{Terminate: true}) - }() + if w.state.Value() != StateReady { + return nil, fmt.Errorf("worker is not ready (%s)", w.state.Value()) + } - w.cmd.Wait() - w.rl.Close() + w.state.set(StateWorking) + defer w.state.registerExec() - w.setState(StateStopped) -} + rsp, err = w.execPayload(rqs) -// attach payload/control relay to the worker. -func (w *Worker) attach(rl goridge.Relay) { - w.mu.Lock() - defer w.mu.Unlock() + if err != nil { + if _, ok := err.(JobError); !ok { + w.state.set(StateErrored) + return nil, err + } + } - w.rl = rl - w.setState(StateBooting) + w.state.set(StateReady) + return rsp, err } -// sets worker State and it's context (non blocking!). -func (w *Worker) setState(state State) { - // safer? - w.State = state - w.Last = time.Now() -} +func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) { + if err := sendHead(w.rl, rqs.Head); err != nil { + return nil, errors.Wrap(err, "header error") + } -// mockError attaches worker specific error (from stderr) to parent error -func (w *Worker) mockError(err error) WorkerError { - if w.err.Len() != 0 { - return WorkerError(w.err.String()) + w.rl.Send(rqs.Body, 0) + + var pr goridge.Prefix + rsp = new(Payload) + + if rsp.Head, pr, err = w.rl.Receive(); err != nil { + return nil, errors.Wrap(err, "worker error") + } + + if !pr.HasFlag(goridge.PayloadControl) { + return nil, fmt.Errorf("mailformed worker response") + } + + if pr.HasFlag(goridge.PayloadError) { + return nil, JobError(rsp.Head) + } + + if rsp.Body, pr, err = w.rl.Receive(); err != nil { + return nil, errors.Wrap(err, "worker error") } - return WorkerError(err.Error()) + return rsp, nil } diff --git a/worker_test.go b/worker_test.go index d4d24364..0cb6f2d4 100644 --- a/worker_test.go +++ b/worker_test.go @@ -1,130 +1,141 @@ package roadrunner import ( - "github.com/spiral/goridge" "github.com/stretchr/testify/assert" - "io" "os/exec" "testing" "time" ) -func getPipes(cmd *exec.Cmd) (io.ReadCloser, io.WriteCloser) { - in, err := cmd.StdoutPipe() - if err != nil { - panic(err) - } +func Test_GetState(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - out, err := cmd.StdinPipe() - if err != nil { - panic(err) - } + w, err := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() - return in, out -} - -func TestOnStarted(t *testing.T) { - pr := exec.Command("php", "tests/echo-client.php") - pr.Start() + assert.NoError(t, err) + assert.NotNil(t, w) - _, err := NewWorker(pr) - assert.NotNil(t, err) - assert.Equal(t, "can't attach to running process", err.Error()) + assert.Equal(t, StateReady, w.State().Value()) + w.Stop() + assert.Equal(t, StateStopped, w.State().Value()) } -func TestNewWorkerState(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) - assert.Nil(t, err) - assert.Equal(t, StateInactive, w.State) +func Test_Echo(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Equal(t, StateBooting, w.State) + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer w.Stop() - assert.Nil(t, w.Start()) - assert.Equal(t, StateReady, w.State) -} + res, err := w.Exec(&Payload{Body: []byte("hello")}) -func TestStop(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Head) - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) - - w.Stop() - assert.Equal(t, StateStopped, w.State) + assert.Equal(t, "hello", res.String()) } -func TestEcho(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) - assert.Nil(t, err) +func Test_Echo_Slow(t *testing.T) { + cmd := exec.Command("php", "tests/slow-client.php", "echo", "pipes", "10", "10") - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer w.Stop() + + res, err := w.Exec(&Payload{Body: []byte("hello")}) - r, ctx, err := w.Execute([]byte("hello"), nil) assert.Nil(t, err) - assert.Nil(t, ctx) - assert.Equal(t, "hello", string(r)) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Head) + + assert.Equal(t, "hello", res.String()) } -func TestError(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/error-client.php")) - assert.Nil(t, err) +func Test_Broken(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "broken", "pipes") - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) + w, err := NewPipeFactory().SpawnWorker(cmd) + go func() { + err := w.Wait() + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + defer w.Stop() - r, ctx, err := w.Execute([]byte("hello"), nil) - assert.Nil(t, r) + res, err := w.Exec(&Payload{Body: []byte("hello")}) + assert.Nil(t, res) assert.NotNil(t, err) - assert.Nil(t, ctx) +} - assert.IsType(t, JobError{}, err) - assert.Equal(t, "hello", err.Error()) +func Test_OnStarted(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + assert.Nil(t, cmd.Start()) + + w, err := newWorker(cmd) + assert.Nil(t, w) + assert.NotNil(t, err) + + assert.Equal(t, "can't attach to running process", err.Error()) } -func TestBroken(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/broken-client.php")) - assert.Nil(t, err) +func Test_Error(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "error", "pipes") - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer w.Stop() - r, ctx, err := w.Execute([]byte("hello"), nil) - assert.Nil(t, r) + res, err := w.Exec(&Payload{Body: []byte("hello")}) + assert.Nil(t, res) assert.NotNil(t, err) - assert.Nil(t, ctx) - assert.IsType(t, WorkerError(""), err) - assert.Contains(t, err.Error(), "undefined_function()") + assert.IsType(t, JobError{}, err) + assert.Equal(t, "hello", err.Error()) } -func TestNumExecutions(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) - assert.Nil(t, err) +func Test_NumExecs(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer w.Stop() - w.Execute([]byte("hello"), nil) - assert.Equal(t, uint64(1), w.NumExecutions) + w.Exec(&Payload{Body: []byte("hello")}) + assert.Equal(t, uint64(1), w.State().NumExecs()) - w.Execute([]byte("hello"), nil) - assert.Equal(t, uint64(2), w.NumExecutions) + w.Exec(&Payload{Body: []byte("hello")}) + assert.Equal(t, uint64(2), w.State().NumExecs()) - w.Execute([]byte("hello"), nil) - assert.Equal(t, uint64(3), w.NumExecutions) + w.Exec(&Payload{Body: []byte("hello")}) + assert.Equal(t, uint64(3), w.State().NumExecs()) } -func TestLastExecution(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) - assert.Nil(t, err) +func Test_StateUpdated(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer w.Stop() tm := time.Now() - w.Execute([]byte("hello"), nil) - assert.True(t, w.Last.After(tm)) + time.Sleep(time.Millisecond) + + w.Exec(&Payload{Body: []byte("hello")}) + assert.True(t, w.State().Updated().After(tm)) } |