summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore5
-rw-r--r--README.md23
-rw-r--r--balancer.go130
-rw-r--r--balancer_test.go1
-rw-r--r--commands.go27
-rw-r--r--composer.lock83
-rw-r--r--config.go40
-rw-r--r--config_test.go40
-rw-r--r--error.go17
-rw-r--r--error_test.go16
-rw-r--r--factory.go8
-rw-r--r--job_error.go10
-rw-r--r--job_error_test.go11
-rw-r--r--payload.go14
-rw-r--r--payload_test.go1
-rw-r--r--pipe_factory.go48
-rw-r--r--pipe_factory_test.go107
-rw-r--r--pool.go231
-rw-r--r--pool_test.go208
-rw-r--r--protocol.go52
-rw-r--r--socket_factory.go121
-rw-r--r--socket_factory_test.go274
-rw-r--r--source/Worker.php16
-rw-r--r--state.go88
-rw-r--r--state_test.go21
-rw-r--r--tests/broken-client.php17
-rw-r--r--tests/broken.php14
-rw-r--r--tests/client.php36
-rw-r--r--tests/delay.php18
-rw-r--r--tests/echo.php (renamed from tests/echo-client.php)11
-rw-r--r--tests/error-client.php16
-rw-r--r--tests/error.php13
-rw-r--r--tests/failboot.php3
-rw-r--r--tests/pid.php17
-rw-r--r--tests/slow-client.php38
-rw-r--r--worker.go258
-rw-r--r--worker_test.go175
37 files changed, 1595 insertions, 613 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 00000000..6755eaa6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,5 @@
+.idea
+.idea/*
+composer.lock
+vendor/
+bin/ \ No newline at end of file
diff --git a/README.md b/README.md
index b4ab2759..ee64e880 100644
--- a/README.md
+++ b/README.md
@@ -1,24 +1,21 @@
RoadRunner
==========
-[![Latest Stable Version](https://poser.pugx.org/spiral/roadrunner/v/stable)](https://packagist.org/packages/spiral/roadrunner)
-[![GoDoc](https://godoc.org/github.com/spiral/roadrunner?status.svg)](https://godoc.org/github.com/spiral/roadrunner)
-[![Build Status](https://travis-ci.org/spiral/roadrunner.svg?branch=master)](https://travis-ci.org/spiral/roadrunner)
-[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/spiral/roadrunner/badges/quality-score.png)](https://scrutinizer-ci.com/g/spiral/roadrunner/?branch=master)
-[![Go Report Card](https://goreportcard.com/badge/github.com/spiral/roadrunner)](https://goreportcard.com/report/github.com/spiral/roadrunner)
-
-PHP application server library for Golang.
+Embeddable PHP application server library for Golang.
Features:
--------
-- load balancer, process manager and task pipeline
-- hot-swap of workers
+- load balancer, process manager and task pipeline
+- swaps workers without stopping the server
- build for multiple frontends (queue, rest, psr-7, async php, etc)
-- works over TPC, unix sockets, standard pipes
-- safe worker termination
-- protocol, worker and job level error management
+- works over TPC, unix sockets and standard pipes
+- controlled worker termination
+- automatic worker replacement
+- worker lifecycle management (create/stop/allocate timeouts)
+- payload context
+- protocol, job and worker level error management
- very fast (~200k calls per second on Ryzen 1700X over 17 threads)
- works on Windows
License:
--------
-The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information.
+The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. \ No newline at end of file
diff --git a/balancer.go b/balancer.go
index 29fcd8d5..16418d55 100644
--- a/balancer.go
+++ b/balancer.go
@@ -1,63 +1,71 @@
package roadrunner
-import (
- "os/exec"
- "sync"
-)
-
-// Balancer provides ability to perform hot-swap between 2 worker pools.
-type Balancer struct {
- mu sync.Mutex // protects pool hot swapping
- pool *Pool // pool to work for user commands
-}
-
-// Spawn initiates underlying pool of workers and replaced old one.
-func (b *Balancer) Spawn(cmd func() *exec.Cmd, factory Factory, cfg Config) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- var (
- err error
- old *Pool
- )
-
- old = b.pool
- if b.pool, err = NewPool(cmd, factory, cfg); err != nil {
- return err
- }
-
- if old != nil {
- go func() {
- old.Close()
- }()
- }
-
- return nil
-}
-
-// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is
-// being destroyed.
-func (b *Balancer) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
- b.mu.Lock()
- pool := b.pool
- b.mu.Unlock()
-
- return pool.Execute(payload, ctx)
-}
-
-// Workers return list of active workers.
-func (b *Balancer) Workers() []*Worker {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- return b.pool.Workers()
-}
-
-// Close closes underlying pool.
-func (b *Balancer) Close() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- b.pool.Close()
- b.pool = nil
-}
+//
+//import (
+// "os/exec"
+// "sync"
+//)
+//
+//// Swapper provides ability to perform hot-swap between 2 worker pools.
+//type Swapper struct {
+// mu sync.Mutex // protects pool hot swapping
+// pool *Pool // pool to work for user commands
+//}
+//
+//// Swap initiates underlying pool of workers and replaces old one.
+//func (b *Swapper) Swap(cmd func() *exec.Cmd, factory Factory, cfg Config) error {
+// var (
+// err error
+// prev *Pool
+// pool *Pool
+// )
+//
+// prev = b.pool
+// if pool, err = NewPool(cmd, factory, cfg); err != nil {
+// return err
+// }
+//
+// if prev != nil {
+// go func() {
+// prev.Close()
+// }()
+// }
+//
+// b.mu.Lock()
+// b.pool = pool
+// b.mu.Unlock()
+//
+// return nil
+//}
+//
+//// Exec one task with given payload and context, returns result and context
+//// or error. Must not be used once pool is being destroyed.
+//func (b *Swapper) Exec(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
+// b.mu.Lock()
+// pool := b.pool
+// b.mu.Unlock()
+//
+// if pool == nil {
+// panic("what")
+// }
+//
+// return pool.Exec(payload, ctx)
+//}
+//
+//// Workers return list of active workers.
+//func (b *Swapper) Workers() []*Worker {
+// b.mu.Lock()
+// pool := b.pool
+// b.mu.Unlock()
+//
+// return pool.Workers()
+//}
+//
+//// Close closes underlying pool.
+//func (b *Swapper) Close() {
+// b.mu.Lock()
+// defer b.mu.Unlock()
+//
+// b.pool.Close()
+// b.pool = nil
+//}
diff --git a/balancer_test.go b/balancer_test.go
new file mode 100644
index 00000000..3f283dce
--- /dev/null
+++ b/balancer_test.go
@@ -0,0 +1 @@
+package roadrunner
diff --git a/commands.go b/commands.go
deleted file mode 100644
index 5368097f..00000000
--- a/commands.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package roadrunner
-
-import (
- "encoding/json"
- "github.com/spiral/goridge"
-)
-
-// TerminateCommand must stop underlying process.
-type TerminateCommand struct {
- Terminate bool `json:"terminate"`
-}
-
-// PidCommand send greeting message between processes in json format.
-type PidCommand struct {
- Pid int `json:"pid"`
- Parent int `json:"parent,omitempty"`
-}
-
-// sends control message via relay using JSON encoding
-func sendCommand(rl goridge.Relay, command interface{}) error {
- bin, err := json.Marshal(command)
- if err != nil {
- return err
- }
-
- return rl.Send(bin, goridge.PayloadControl)
-}
diff --git a/composer.lock b/composer.lock
index 7646e668..10a511f7 100644
--- a/composer.lock
+++ b/composer.lock
@@ -4,21 +4,20 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file",
"This file is @generated automatically"
],
- "hash": "a05da84ea1e9e80a52fc218bb4d952c1",
"content-hash": "2deb68e3347a18c68289e209971b087d",
"packages": [
{
"name": "spiral/goridge",
- "version": "v2.0.0",
+ "version": "v2.0.1",
"source": {
"type": "git",
"url": "https://github.com/spiral/goridge.git",
- "reference": "f60182bef09f1e45a47908e1f0fb080affdcab81"
+ "reference": "24a36898426359ec2a4f645ff8ee06f153ac2f3f"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/spiral/goridge/zipball/f60182bef09f1e45a47908e1f0fb080affdcab81",
- "reference": "f60182bef09f1e45a47908e1f0fb080affdcab81",
+ "url": "https://api.github.com/repos/spiral/goridge/zipball/24a36898426359ec2a4f645ff8ee06f153ac2f3f",
+ "reference": "24a36898426359ec2a4f645ff8ee06f153ac2f3f",
"shasum": ""
},
"require": {
@@ -44,7 +43,7 @@
}
],
"description": "High-performance PHP-to-Golang RPC bridge",
- "time": "2017-11-17 11:07:26"
+ "time": "2018-01-23T22:15:20+00:00"
}
],
"packages-dev": [
@@ -100,7 +99,7 @@
"constructor",
"instantiate"
],
- "time": "2017-07-22 11:58:36"
+ "time": "2017-07-22T11:58:36+00:00"
},
{
"name": "myclabs/deep-copy",
@@ -145,7 +144,7 @@
"object",
"object graph"
],
- "time": "2017-10-19 19:58:43"
+ "time": "2017-10-19T19:58:43+00:00"
},
{
"name": "phar-io/manifest",
@@ -200,7 +199,7 @@
}
],
"description": "Component for reading phar.io manifest information from a PHP Archive (PHAR)",
- "time": "2017-03-05 18:14:27"
+ "time": "2017-03-05T18:14:27+00:00"
},
{
"name": "phar-io/version",
@@ -247,7 +246,7 @@
}
],
"description": "Library for handling version information and constraints",
- "time": "2017-03-05 17:38:23"
+ "time": "2017-03-05T17:38:23+00:00"
},
{
"name": "phpdocumentor/reflection-common",
@@ -301,7 +300,7 @@
"reflection",
"static analysis"
],
- "time": "2017-09-11 18:02:19"
+ "time": "2017-09-11T18:02:19+00:00"
},
{
"name": "phpdocumentor/reflection-docblock",
@@ -352,7 +351,7 @@
}
],
"description": "With this component, a library can provide support for annotations via DocBlocks or otherwise retrieve information that is embedded in a DocBlock.",
- "time": "2017-11-27 17:38:31"
+ "time": "2017-11-27T17:38:31+00:00"
},
{
"name": "phpdocumentor/type-resolver",
@@ -399,7 +398,7 @@
"email": "[email protected]"
}
],
- "time": "2017-07-14 14:27:02"
+ "time": "2017-07-14T14:27:02+00:00"
},
{
"name": "phpspec/prophecy",
@@ -462,7 +461,7 @@
"spy",
"stub"
],
- "time": "2017-11-24 13:59:53"
+ "time": "2017-11-24T13:59:53+00:00"
},
{
"name": "phpunit/php-code-coverage",
@@ -525,7 +524,7 @@
"testing",
"xunit"
],
- "time": "2017-12-06 09:29:45"
+ "time": "2017-12-06T09:29:45+00:00"
},
{
"name": "phpunit/php-file-iterator",
@@ -572,7 +571,7 @@
"filesystem",
"iterator"
],
- "time": "2017-11-27 13:52:08"
+ "time": "2017-11-27T13:52:08+00:00"
},
{
"name": "phpunit/php-text-template",
@@ -613,7 +612,7 @@
"keywords": [
"template"
],
- "time": "2015-06-21 13:50:34"
+ "time": "2015-06-21T13:50:34+00:00"
},
{
"name": "phpunit/php-timer",
@@ -662,7 +661,7 @@
"keywords": [
"timer"
],
- "time": "2017-02-26 11:10:40"
+ "time": "2017-02-26T11:10:40+00:00"
},
{
"name": "phpunit/php-token-stream",
@@ -711,7 +710,7 @@
"keywords": [
"tokenizer"
],
- "time": "2017-11-27 05:48:46"
+ "time": "2017-11-27T05:48:46+00:00"
},
{
"name": "phpunit/phpunit",
@@ -795,20 +794,20 @@
"testing",
"xunit"
],
- "time": "2017-12-17 06:31:19"
+ "time": "2017-12-17T06:31:19+00:00"
},
{
"name": "phpunit/phpunit-mock-objects",
- "version": "5.0.5",
+ "version": "5.0.6",
"source": {
"type": "git",
"url": "https://github.com/sebastianbergmann/phpunit-mock-objects.git",
- "reference": "283b9f4f670e3a6fd6c4ff95c51a952eb5c75933"
+ "reference": "33fd41a76e746b8fa96d00b49a23dadfa8334cdf"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/sebastianbergmann/phpunit-mock-objects/zipball/283b9f4f670e3a6fd6c4ff95c51a952eb5c75933",
- "reference": "283b9f4f670e3a6fd6c4ff95c51a952eb5c75933",
+ "url": "https://api.github.com/repos/sebastianbergmann/phpunit-mock-objects/zipball/33fd41a76e746b8fa96d00b49a23dadfa8334cdf",
+ "reference": "33fd41a76e746b8fa96d00b49a23dadfa8334cdf",
"shasum": ""
},
"require": {
@@ -854,7 +853,7 @@
"mock",
"xunit"
],
- "time": "2017-12-10 08:01:53"
+ "time": "2018-01-06T05:45:45+00:00"
},
{
"name": "sebastian/code-unit-reverse-lookup",
@@ -899,20 +898,20 @@
],
"description": "Looks up which function or method a line of code belongs to",
"homepage": "https://github.com/sebastianbergmann/code-unit-reverse-lookup/",
- "time": "2017-03-04 06:30:41"
+ "time": "2017-03-04T06:30:41+00:00"
},
{
"name": "sebastian/comparator",
- "version": "2.1.1",
+ "version": "2.1.2",
"source": {
"type": "git",
"url": "https://github.com/sebastianbergmann/comparator.git",
- "reference": "b11c729f95109b56a0fe9650c6a63a0fcd8c439f"
+ "reference": "11c07feade1d65453e06df3b3b90171d6d982087"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/sebastianbergmann/comparator/zipball/b11c729f95109b56a0fe9650c6a63a0fcd8c439f",
- "reference": "b11c729f95109b56a0fe9650c6a63a0fcd8c439f",
+ "url": "https://api.github.com/repos/sebastianbergmann/comparator/zipball/11c07feade1d65453e06df3b3b90171d6d982087",
+ "reference": "11c07feade1d65453e06df3b3b90171d6d982087",
"shasum": ""
},
"require": {
@@ -963,7 +962,7 @@
"compare",
"equality"
],
- "time": "2017-12-22 14:50:35"
+ "time": "2018-01-12T06:34:42+00:00"
},
{
"name": "sebastian/diff",
@@ -1015,7 +1014,7 @@
"keywords": [
"diff"
],
- "time": "2017-08-03 08:09:46"
+ "time": "2017-08-03T08:09:46+00:00"
},
{
"name": "sebastian/environment",
@@ -1065,7 +1064,7 @@
"environment",
"hhvm"
],
- "time": "2017-07-01 08:51:00"
+ "time": "2017-07-01T08:51:00+00:00"
},
{
"name": "sebastian/exporter",
@@ -1132,7 +1131,7 @@
"export",
"exporter"
],
- "time": "2017-04-03 13:19:02"
+ "time": "2017-04-03T13:19:02+00:00"
},
{
"name": "sebastian/global-state",
@@ -1183,7 +1182,7 @@
"keywords": [
"global state"
],
- "time": "2017-04-27 15:39:26"
+ "time": "2017-04-27T15:39:26+00:00"
},
{
"name": "sebastian/object-enumerator",
@@ -1230,7 +1229,7 @@
],
"description": "Traverses array structures and object graphs to enumerate all referenced objects",
"homepage": "https://github.com/sebastianbergmann/object-enumerator/",
- "time": "2017-08-03 12:35:26"
+ "time": "2017-08-03T12:35:26+00:00"
},
{
"name": "sebastian/object-reflector",
@@ -1275,7 +1274,7 @@
],
"description": "Allows reflection of object attributes, including inherited and non-public ones",
"homepage": "https://github.com/sebastianbergmann/object-reflector/",
- "time": "2017-03-29 09:07:27"
+ "time": "2017-03-29T09:07:27+00:00"
},
{
"name": "sebastian/recursion-context",
@@ -1328,7 +1327,7 @@
],
"description": "Provides functionality to recursively process PHP variables",
"homepage": "http://www.github.com/sebastianbergmann/recursion-context",
- "time": "2017-03-03 06:23:57"
+ "time": "2017-03-03T06:23:57+00:00"
},
{
"name": "sebastian/resource-operations",
@@ -1370,7 +1369,7 @@
],
"description": "Provides a list of PHP built-in functions that operate on resources",
"homepage": "https://www.github.com/sebastianbergmann/resource-operations",
- "time": "2015-07-28 20:34:47"
+ "time": "2015-07-28T20:34:47+00:00"
},
{
"name": "sebastian/version",
@@ -1413,7 +1412,7 @@
],
"description": "Library that helps with managing the version number of Git-hosted PHP projects",
"homepage": "https://github.com/sebastianbergmann/version",
- "time": "2016-10-03 07:35:21"
+ "time": "2016-10-03T07:35:21+00:00"
},
{
"name": "theseer/tokenizer",
@@ -1453,7 +1452,7 @@
}
],
"description": "A small library for converting tokenized PHP source code into XML and potentially other formats",
- "time": "2017-04-07 12:08:54"
+ "time": "2017-04-07T12:08:54+00:00"
},
{
"name": "webmozart/assert",
@@ -1503,7 +1502,7 @@
"check",
"validate"
],
- "time": "2016-11-23 20:04:58"
+ "time": "2016-11-23T20:04:58+00:00"
}
],
"aliases": [],
diff --git a/config.go b/config.go
index e5d78d49..e48cefc2 100644
--- a/config.go
+++ b/config.go
@@ -1,19 +1,43 @@
package roadrunner
-import "time"
+import (
+ "fmt"
+ "time"
+)
// Config defines basic behaviour of worker creation and handling process.
type Config struct {
- // MaxWorkers defines how many sub-processes can be run at once. This value might be doubled by Balancer while hot-swap.
- MaxWorkers uint64
+ // NumWorkers defines how many sub-processes can be run at once. This value
+ // might be doubled by Swapper while hot-swap.
+ NumWorkers uint64
- // MaxExecutions defines how many executions is allowed for the worker until it's destruction. Set 1 to create new process
- // for each new task, 0 to let worker handle as many tasks as it can.
+ // MaxExecutions defines how many executions is allowed for the worker until
+ // it's destruction. set 1 to create new process for each new task, 0 to let
+ // worker handle as many tasks as it can.
MaxExecutions uint64
- // AllocateTimeout defines for how long pool will be waiting for a worker to be freed to handle the task.
+ // AllocateTimeout defines for how long pool will be waiting for a worker to
+ // be freed to handle the task.
AllocateTimeout time.Duration
- // DestroyOnError when set to true workers will be destructed after any JobError.
- DestroyOnError bool
+ // DestroyTimeout defines for how long pool should be waiting for worker to
+ // properly stop, if timeout reached worker will be killed.
+ DestroyTimeout time.Duration
+}
+
+// Valid returns error if config not valid
+func (cfg *Config) Valid() error {
+ if cfg.NumWorkers == 0 {
+ return fmt.Errorf("config.NumWorkers must be set")
+ }
+
+ if cfg.AllocateTimeout == 0 {
+ return fmt.Errorf("config.AllocateTimeout must be set")
+ }
+
+ if cfg.DestroyTimeout == 0 {
+ return fmt.Errorf("config.DestroyTimeout must be set")
+ }
+
+ return nil
}
diff --git a/config_test.go b/config_test.go
new file mode 100644
index 00000000..64bad7cb
--- /dev/null
+++ b/config_test.go
@@ -0,0 +1,40 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func Test_NumWorkers(t *testing.T) {
+ cfg := Config{
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second * 10,
+ }
+ err := cfg.Valid()
+
+ assert.NotNil(t, err)
+ assert.Equal(t, "config.NumWorkers must be set", err.Error())
+}
+
+func Test_AllocateTimeout(t *testing.T) {
+ cfg := Config{
+ NumWorkers: 10,
+ DestroyTimeout: time.Second * 10,
+ }
+ err := cfg.Valid()
+
+ assert.NotNil(t, err)
+ assert.Equal(t, "config.AllocateTimeout must be set", err.Error())
+}
+
+func Test_DestroyTimeout(t *testing.T) {
+ cfg := Config{
+ NumWorkers: 10,
+ AllocateTimeout: time.Second,
+ }
+ err := cfg.Valid()
+
+ assert.NotNil(t, err)
+ assert.Equal(t, "config.DestroyTimeout must be set", err.Error())
+}
diff --git a/error.go b/error.go
deleted file mode 100644
index a8f9e539..00000000
--- a/error.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package roadrunner
-
-// WorkerError is communication/process error.
-type WorkerError string
-
-// Error converts error context to string
-func (we WorkerError) Error() string {
- return string(we)
-}
-
-// JobError is job level error (no worker halt)
-type JobError []byte
-
-// Error converts error context to string
-func (je JobError) Error() string {
- return string(je)
-}
diff --git a/error_test.go b/error_test.go
deleted file mode 100644
index e8fa62b4..00000000
--- a/error_test.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package roadrunner
-
-import (
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestWorkerError_Error(t *testing.T) {
- e := WorkerError("error")
- assert.Equal(t, "error", e.Error())
-}
-
-func TestJobError_Error(t *testing.T) {
- e := JobError("error")
- assert.Equal(t, "error", e.Error())
-}
diff --git a/factory.go b/factory.go
index e58d9364..97ea3a87 100644
--- a/factory.go
+++ b/factory.go
@@ -4,9 +4,7 @@ import "os/exec"
// Factory is responsible of wrapping given command into tasks worker.
type Factory interface {
- // NewWorker creates new worker process based on given process.
- NewWorker(cmd *exec.Cmd) (w *Worker, err error)
-
- // Close closes all open factory descriptors.
- Close() error
+ // SpawnWorker creates new worker process based on given command.
+ // Process must not be started.
+ SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)
}
diff --git a/job_error.go b/job_error.go
new file mode 100644
index 00000000..d024ad11
--- /dev/null
+++ b/job_error.go
@@ -0,0 +1,10 @@
+package roadrunner
+
+// JobError is job level error (no worker halt), wraps at top
+// of error context
+type JobError []byte
+
+// Error converts error context to string
+func (je JobError) Error() string {
+ return string(je)
+}
diff --git a/job_error_test.go b/job_error_test.go
new file mode 100644
index 00000000..9b0fa53e
--- /dev/null
+++ b/job_error_test.go
@@ -0,0 +1,11 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_JobError_Error(t *testing.T) {
+ e := JobError([]byte("error"))
+ assert.Equal(t, "error", e.Error())
+}
diff --git a/payload.go b/payload.go
new file mode 100644
index 00000000..63a709dc
--- /dev/null
+++ b/payload.go
@@ -0,0 +1,14 @@
+package roadrunner
+
+type Payload struct {
+ Head, Body []byte
+}
+
+func (p *Payload) HeadString() {
+
+}
+
+// String returns payload body as string
+func (p *Payload) String() string {
+ return string(p.Body)
+}
diff --git a/payload_test.go b/payload_test.go
new file mode 100644
index 00000000..3f283dce
--- /dev/null
+++ b/payload_test.go
@@ -0,0 +1 @@
+package roadrunner
diff --git a/pipe_factory.go b/pipe_factory.go
index ce32dacc..30c34139 100644
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -2,45 +2,59 @@ package roadrunner
import (
"github.com/spiral/goridge"
+ "io"
"os/exec"
+ "github.com/pkg/errors"
)
-// PipeFactory connects to workers using standard streams (STDIN, STDOUT pipes).
+// PipeFactory connects to workers using standard
+// streams (STDIN, STDOUT pipes).
type PipeFactory struct {
}
-// NewPipeFactory returns new factory instance and starts listening
+// NewPipeFactory returns new factory instance and starts
+// listening
func NewPipeFactory() *PipeFactory {
return &PipeFactory{}
}
-// NewWorker creates worker and connects it to appropriate relay or returns error
-func (f *PipeFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) {
- w, err = NewWorker(cmd)
- if err != nil {
+// SpawnWorker creates new worker and connects it to goridge relay,
+// method Wait() must be handled on level above.
+func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
+ if w, err = newWorker(cmd); err != nil {
return nil, err
}
- in, err := cmd.StdoutPipe()
- if err != nil {
+ var (
+ in io.ReadCloser
+ out io.WriteCloser
+ )
+
+ if in, err = cmd.StdoutPipe(); err != nil {
return nil, err
}
- out, err := cmd.StdinPipe()
- if err != nil {
+ if out, err = cmd.StdinPipe(); err != nil {
return nil, err
}
+ w.rl = goridge.NewPipeRelay(in, out)
+
if err := w.Start(); err != nil {
- return nil, err
+ return nil, errors.Wrap(err, "process error")
}
- w.attach(goridge.NewPipeRelay(in, out))
+ // todo: timeout ?
+ if pid, err := fetchPID(w.rl); pid != *w.Pid {
+ go func(w *Worker) { w.Kill() }(w)
- return w, nil
-}
+ if wErr := w.Wait(); wErr != nil {
+ err = errors.Wrap(wErr, err.Error())
+ }
+
+ return nil, errors.Wrap(err, "unable to connect to worker")
+ }
-// Close closes all open factory descriptors.
-func (f *PipeFactory) Close() error {
- return nil
+ w.state.set(StateReady)
+ return w, nil
}
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
new file mode 100644
index 00000000..97e94487
--- /dev/null
+++ b/pipe_factory_test.go
@@ -0,0 +1,107 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "os/exec"
+ "testing"
+)
+
+func Test_Pipe_Start(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ w.Stop()
+}
+
+func Test_Pipe_Failboot(t *testing.T) {
+ cmd := exec.Command("php", "tests/failboot.php")
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "failboot")
+}
+
+func Test_Pipe_Invalid(t *testing.T) {
+ cmd := exec.Command("php", "tests/invalid.php")
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_Echo(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer w.Stop()
+
+ res, err := w.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Head)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Pipe_Broken(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ err := w.Wait()
+
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "undefined_function()")
+ }()
+ defer w.Stop()
+
+ res, err := w.Exec(&Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res)
+}
+
+func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
+ f := NewPipeFactory()
+ for n := 0; n < b.N; n++ {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ w, _ := f.SpawnWorker(cmd)
+ go func() {
+ if w.Wait() != nil {
+ b.Fail()
+ }
+ }()
+
+ w.Stop()
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ w.Wait()
+ }()
+ defer w.Stop()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
diff --git a/pool.go b/pool.go
index 50f14e4e..75708175 100644
--- a/pool.go
+++ b/pool.go
@@ -2,103 +2,89 @@ package roadrunner
import (
"fmt"
+ "log"
"os/exec"
"sync"
- "sync/atomic"
"time"
+ "github.com/pkg/errors"
)
const (
- // ContextTerminate must be sent by worker in control payload if worker want to die.
- ContextTerminate = "TERMINATE"
+ // Control header to be made by worker to request termination.
+ TerminateRequest = "{\"terminate\": true}"
)
// Pool controls worker creation, destruction and task routing.
type Pool struct {
- cfg Config // pool behaviour
- cmd func() *exec.Cmd // worker command creator
- factory Factory // creates and connects to workers
- numWorkers uint64 // current number of tasks workers
- tasks sync.WaitGroup // counts all tasks executions
- mua sync.Mutex // protects worker allocation
- muw sync.RWMutex // protects state of worker list
- workers []*Worker // all registered workers
- free chan *Worker // freed workers
+ // pool behaviour
+ cfg Config
+
+ // worker command creator
+ cmd func() *exec.Cmd
+
+ // creates and connects to workers
+ factory Factory
+
+ // active task executions
+ tasks sync.WaitGroup
+
+ // workers circular allocation buffer
+ free chan *Worker
+
+ // protects state of worker list, does not affect allocation
+ muw sync.RWMutex
+
+ // all registered workers
+ workers []*Worker
}
// NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker.
func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) {
+ if err := cfg.Valid(); err != nil {
+ return nil, errors.Wrap(err, "config error")
+ }
+
p := &Pool{
cfg: cfg,
cmd: cmd,
factory: factory,
- workers: make([]*Worker, 0, cfg.MaxWorkers),
- free: make(chan *Worker, cfg.MaxWorkers),
+ workers: make([]*Worker, 0, cfg.NumWorkers),
+ free: make(chan *Worker, cfg.NumWorkers),
}
- // to test if worker ready
- w, err := p.createWorker()
- if err != nil {
- return nil, err
- }
+ //todo: watch for error from workers!!!
- p.free <- w
- return p, nil
-}
+ // constant number of workers simplify logic
+ for i := uint64(0); i < p.cfg.NumWorkers; i++ {
+ // to test if worker ready
+ w, err := p.createWorker()
-// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is
-// being destroyed.
-func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
- p.tasks.Add(1)
- defer p.tasks.Done()
+ if err != nil {
+ p.Destroy()
- w, err := p.allocateWorker()
- if err != nil {
- return nil, nil, err
- }
-
- if resp, rCtx, err = w.Execute(payload, ctx); err != nil {
- if !p.cfg.DestroyOnError {
- if err, jobError := err.(JobError); jobError {
- p.free <- w
- return nil, nil, err
- }
+ return nil, err
}
- // worker level error
- p.destroyWorker(w)
-
- return nil, nil, err
- }
-
- // controlled destruction
- if len(resp) == 0 && string(rCtx) == ContextTerminate {
- p.destroyWorker(w)
- go func() {
- //immediate refill
- if w, err := p.createWorker(); err != nil {
- p.free <- w
+ // worker watcher
+ go func(w *Worker) {
+ if err := w.Wait(); err != nil {
+ // todo: register error
+ log.Println(err)
}
- }()
-
- return p.Execute(payload, ctx)
- }
+ }(w)
- if p.cfg.MaxExecutions != 0 && atomic.LoadUint64(&w.NumExecutions) > p.cfg.MaxExecutions {
- p.destroyWorker(w)
- } else {
p.free <- w
}
- return resp, rCtx, nil
+ return p, nil
}
-// Config returns associated pool configuration.
+// Config returns associated pool configuration. Immutable.
func (p *Pool) Config() Config {
return p.cfg
}
-// Workers returns workers associated with the pool.
+// Workers returns worker list associated with the pool.
func (p *Pool) Workers() (workers []*Worker) {
p.muw.RLock()
defer p.muw.RUnlock()
@@ -110,8 +96,46 @@ func (p *Pool) Workers() (workers []*Worker) {
return workers
}
-// Close all underlying workers (but let them to complete the task).
-func (p *Pool) Close() {
+// Exec one task with given payload and context, returns result or error.
+func (p *Pool) Exec(rqs *Payload) (rsp *Payload, err error) {
+ p.tasks.Add(1)
+ defer p.tasks.Done()
+
+ w, err := p.allocateWorker()
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to allocate worker")
+ }
+
+ rsp, err = w.Exec(rqs)
+
+ if err != nil {
+ // soft job errors are allowed
+ if _, jobError := err.(JobError); jobError {
+ p.free <- w
+ return nil, err
+ }
+
+ go p.replaceWorker(w, err)
+ return nil, err
+ }
+
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Head != nil && string(rsp.Head) == TerminateRequest {
+ go p.replaceWorker(w, err)
+ return p.Exec(rqs)
+ }
+
+ if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
+ go p.replaceWorker(w, p.cfg.MaxExecutions)
+ } else {
+ p.free <- w
+ }
+
+ return rsp, nil
+}
+
+// Destroy all underlying workers (but let them to complete the task).
+func (p *Pool) Destroy() {
p.tasks.Wait()
var wg sync.WaitGroup
@@ -119,6 +143,7 @@ func (p *Pool) Close() {
wg.Add(1)
go func(w *Worker) {
defer wg.Done()
+
p.destroyWorker(w)
}(w)
}
@@ -127,63 +152,69 @@ func (p *Pool) Close() {
}
// finds free worker in a given time interval or creates new if allowed.
-func (p *Pool) allocateWorker() (*Worker, error) {
- p.mua.Lock()
- defer p.mua.Unlock()
-
+func (p *Pool) allocateWorker() (w *Worker, err error) {
select {
- case w := <-p.free:
- // we already have free worker
+ case w = <-p.free:
return w, nil
default:
- if p.numWorkers < p.cfg.MaxWorkers {
- return p.createWorker()
- }
+ // enable timeout handler
+ }
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("unable to allocate worker, timeout (%s)", p.cfg.AllocateTimeout)
- case w := <-p.free:
- timeout.Stop()
- return w, nil
- }
+ timeout := time.NewTimer(p.cfg.AllocateTimeout)
+ select {
+ case <-timeout.C:
+ return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
+ case w := <-p.free:
+ timeout.Stop()
+ return w, nil
}
}
+// replaces dead or expired worker with new instance
+func (p *Pool) replaceWorker(w *Worker, caused interface{}) {
+ go p.destroyWorker(w)
+
+ nw, _ := p.createWorker()
+ p.free <- nw
+}
+
// destroy and remove worker from the pool.
func (p *Pool) destroyWorker(w *Worker) {
- atomic.AddUint64(&p.numWorkers, ^uint64(0))
-
- go func() {
- w.Stop()
+ // detaching
+ p.muw.Lock()
+ for i, wc := range p.workers {
+ if wc == w {
+ p.workers = p.workers[:i+1]
+ break
+ }
+ }
+ p.muw.Unlock()
- p.muw.Lock()
- defer p.muw.Unlock()
+ go w.Stop()
- for i, wc := range p.workers {
- if wc == w {
- p.workers = p.workers[:i+1]
- break
- }
+ select {
+ case <-w.waitDone:
+ // worker is dead
+ case <-time.NewTimer(time.Second * 10).C:
+ // failed to stop process
+ if err := w.Kill(); err != nil {
+ //todo: can't kill or already killed?
}
- }()
+ }
}
-// creates new worker (must be called in a locked state).
+// creates new worker using associated factory. automatically
+// adds worker to the worker list (background)
func (p *Pool) createWorker() (*Worker, error) {
- w, err := p.factory.NewWorker(p.cmd())
+ w, err := p.factory.SpawnWorker(p.cmd())
if err != nil {
return nil, err
}
- atomic.AddUint64(&p.numWorkers, 1)
+ p.muw.Lock()
+ defer p.muw.Unlock()
- go func() {
- p.muw.Lock()
- defer p.muw.Unlock()
- p.workers = append(p.workers, w)
- }()
+ p.workers = append(p.workers, w)
return w, nil
}
diff --git a/pool_test.go b/pool_test.go
new file mode 100644
index 00000000..92a594df
--- /dev/null
+++ b/pool_test.go
@@ -0,0 +1,208 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "os/exec"
+ "runtime"
+ "sync"
+ "testing"
+ "time"
+ "strconv"
+)
+
+var cfg = Config{
+ NumWorkers: uint64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+}
+
+func Test_NewPool(t *testing.T) {
+ p, err := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ NewPipeFactory(),
+ cfg,
+ )
+ defer p.Destroy()
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+}
+
+func Test_ConfigError(t *testing.T) {
+ p, err := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ NewPipeFactory(),
+ Config{
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Nil(t, p)
+ assert.Error(t, err)
+}
+
+func Test_Pool_Echo(t *testing.T) {
+ p, err := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ NewPipeFactory(),
+ cfg,
+ )
+ defer p.Destroy()
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ res, err := p.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Head)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Pool_AllocateTimeout(t *testing.T) {
+ p, err := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
+ NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Millisecond * 50,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ done := make(chan interface{})
+ go func() {
+ _, err := p.Exec(&Payload{Body: []byte("100")})
+ assert.NoError(t, err)
+ close(done)
+ }()
+
+ // to ensure that worker is already busy
+ time.Sleep(time.Millisecond * 10)
+
+ _, err = p.Exec(&Payload{Body: []byte("10")})
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "worker timeout")
+
+ <-done
+ p.Destroy()
+}
+
+//todo: termiante
+
+func Test_Pool_Replace_Worker(t *testing.T) {
+ p, err := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
+ NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ MaxExecutions: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ defer p.Destroy()
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ var lastPID string
+ lastPID = strconv.Itoa(*p.Workers()[0].Pid)
+
+ res, err := p.Exec(&Payload{Body: []byte("hello")})
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Head)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+func Benchmark_Pool_Allocate(b *testing.B) {
+ p, _ := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ NewPipeFactory(),
+ cfg,
+ )
+ defer p.Destroy()
+
+ for n := 0; n < b.N; n++ {
+ w, err := p.allocateWorker()
+ if err != nil {
+ b.Fail()
+ }
+
+ p.free <- w
+ }
+}
+
+func Benchmark_Pool_Echo(b *testing.B) {
+ p, _ := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ NewPipeFactory(),
+ cfg,
+ )
+ defer p.Destroy()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Pool_Echo_Batched(b *testing.B) {
+ p, _ := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ NewPipeFactory(),
+ cfg,
+ )
+ defer p.Destroy()
+
+ var wg sync.WaitGroup
+ for i := 0; i < b.N; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }()
+ }
+
+ wg.Wait()
+}
+
+func Benchmark_Pool_Echo_Replaced(b *testing.B) {
+ p, _ := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ MaxExecutions: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ defer p.Destroy()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
diff --git a/protocol.go b/protocol.go
new file mode 100644
index 00000000..b78f2807
--- /dev/null
+++ b/protocol.go
@@ -0,0 +1,52 @@
+package roadrunner
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/spiral/goridge"
+ "os"
+)
+
+type stopCommand struct {
+ Stop bool `json:"stop"`
+}
+
+type pidCommand struct {
+ Pid int `json:"pid"`
+}
+
+func sendHead(rl goridge.Relay, v interface{}) error {
+ if v == nil {
+ rl.Send(nil, goridge.PayloadControl)
+ }
+
+ if data, ok := v.([]byte); ok {
+ return rl.Send(data, goridge.PayloadControl)
+ }
+
+ data, err := json.Marshal(v)
+ if err != nil {
+ return fmt.Errorf("invalid payload: %s", err)
+ }
+
+ return rl.Send(data, goridge.PayloadControl)
+}
+
+func fetchPID(rl goridge.Relay) (pid int, err error) {
+ if err := sendHead(rl, pidCommand{Pid: os.Getpid()}); err != nil {
+ return 0, err
+ }
+
+ body, p, err := rl.Receive()
+ if !p.HasFlag(goridge.PayloadControl) {
+ return 0, fmt.Errorf("unexpected response, header is missing")
+ }
+
+ link := &pidCommand{}
+ //log.Println(string(body))
+ if err := json.Unmarshal(body, link); err != nil {
+ return 0, err
+ }
+
+ return link.Pid, nil
+}
diff --git a/socket_factory.go b/socket_factory.go
index 5d1b488b..b915973b 100644
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -1,69 +1,72 @@
package roadrunner
import (
- "encoding/json"
"fmt"
"github.com/spiral/goridge"
"net"
- "os"
"os/exec"
"sync"
"time"
+ "github.com/pkg/errors"
)
// SocketFactory connects to external workers using socket server.
type SocketFactory struct {
- ls net.Listener // listens for incoming connections from underlying processes
- tout time.Duration // connection timeout
- mu sync.Mutex // protects socket mapping
- wait map[int]chan *goridge.SocketRelay // sockets which are waiting for process association
+ // listens for incoming connections from underlying processes
+ ls net.Listener
+
+ // relay connection timeout
+ tout time.Duration
+
+ // protects socket mapping
+ mu sync.Mutex
+
+ // sockets which are waiting for process association
+ relays map[int]chan *goridge.SocketRelay
}
-// NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory
-// should wait for incoming relay connection
+// NewSocketFactory returns SocketFactory attached to a given socket listener.
+// tout specifies for how long factory should serve for incoming relay connection
func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory {
f := &SocketFactory{
- ls: ls,
- tout: tout,
- wait: make(map[int]chan *goridge.SocketRelay),
+ ls: ls,
+ tout: tout,
+ relays: make(map[int]chan *goridge.SocketRelay),
}
go f.listen()
+
return f
}
-// NewWorker creates worker and connects it to appropriate relay or returns error
-func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) {
- w, err = NewWorker(cmd)
- if err != nil {
- return nil, err
+// SpawnWorker creates worker and connects it to appropriate relay or returns error
+func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, workerError error) {
+ if w, workerError = newWorker(cmd); workerError != nil {
+ return nil, workerError
}
if err := w.Start(); err != nil {
- return nil, err
- }
-
- w.Pid = &w.cmd.Process.Pid
- if w.Pid == nil {
- return nil, fmt.Errorf("can't to start worker %s", w)
+ return nil, errors.Wrap(err, "process error")
}
- rl, err := f.waitRelay(*w.Pid, f.tout)
+ rl, err := f.findRelay(w, f.tout)
if err != nil {
- return nil, fmt.Errorf("can't connect to worker %s: %s", w, err)
+ go func(w *Worker) { w.Kill() }(w)
+
+ if wErr := w.Wait(); wErr != nil {
+ err = errors.Wrap(wErr, err.Error())
+ }
+
+ return nil, errors.Wrap(err, "unable to connect to worker")
}
- w.attach(rl)
+ w.rl = rl
+ w.state.set(StateReady)
return w, nil
}
-// Close closes all open factory descriptors.
-func (f *SocketFactory) Close() error {
- return f.ls.Close()
-}
-
-// listen for incoming wait and associate sockets with active workers
+// listens for incoming socket connections
func (f *SocketFactory) listen() {
for {
conn, err := f.ls.Accept()
@@ -79,16 +82,23 @@ func (f *SocketFactory) listen() {
}
// waits for worker to connect over socket and returns associated relay of timeout
-func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketRelay, error) {
+func (f *SocketFactory) findRelay(w *Worker, tout time.Duration) (*goridge.SocketRelay, error) {
timer := time.NewTimer(tout)
- select {
- case rl := <-f.relayChan(pid):
- timer.Stop()
- f.cleanChan(pid)
-
- return rl, nil
- case <-timer.C:
- return nil, fmt.Errorf("relay timer for [%v]", pid)
+ for {
+ select {
+ case rl := <-f.relayChan(*w.Pid):
+ timer.Stop()
+ f.cleanChan(*w.Pid)
+ return rl, nil
+
+ case <-timer.C:
+ return nil, fmt.Errorf("relay timeout")
+
+ case <-w.waitDone:
+ timer.Stop()
+ f.cleanChan(*w.Pid)
+ return nil, fmt.Errorf("worker is gone")
+ }
}
}
@@ -97,10 +107,10 @@ func (f *SocketFactory) relayChan(pid int) chan *goridge.SocketRelay {
f.mu.Lock()
defer f.mu.Unlock()
- rl, ok := f.wait[pid]
+ rl, ok := f.relays[pid]
if !ok {
- f.wait[pid] = make(chan *goridge.SocketRelay)
- return f.wait[pid]
+ f.relays[pid] = make(chan *goridge.SocketRelay)
+ return f.relays[pid]
}
return rl
@@ -111,28 +121,5 @@ func (f *SocketFactory) cleanChan(pid int) {
f.mu.Lock()
defer f.mu.Unlock()
- delete(f.wait, pid)
-}
-
-// send control command to relay and return associated Pid (or error)
-func fetchPID(rl goridge.Relay) (pid int, err error) {
- if err := sendCommand(rl, PidCommand{Pid: os.Getpid()}); err != nil {
- return 0, err
- }
-
- body, p, err := rl.Receive()
- if !p.HasFlag(goridge.PayloadControl) {
- return 0, fmt.Errorf("unexpected response, `control` header is missing")
- }
-
- link := &PidCommand{}
- if err := json.Unmarshal(body, link); err != nil {
- return 0, err
- }
-
- if link.Parent != os.Getpid() {
- return 0, fmt.Errorf("integrity error, parent process does not match")
- }
-
- return link.Pid, nil
+ delete(f.relays, pid)
}
diff --git a/socket_factory_test.go b/socket_factory_test.go
new file mode 100644
index 00000000..c64c607f
--- /dev/null
+++ b/socket_factory_test.go
@@ -0,0 +1,274 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "net"
+ "os/exec"
+ "runtime"
+ "testing"
+ "time"
+)
+
+func Test_Tcp_Start(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer ls.Close()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+
+ w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ w.Stop()
+}
+
+func Test_Tcp_Failboot(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer ls.Close()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "tests/failboot.php")
+
+ w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "failboot")
+}
+
+func Test_Tcp_Timeout(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer ls.Close()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "tests/slow-client.php", "echo", "tcp", "200", "0")
+
+ w, err := NewSocketFactory(ls, time.Millisecond*100).SpawnWorker(cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "relay timeout")
+}
+
+func Test_Tcp_Invalid(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer ls.Close()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "tests/invalid.php")
+
+ w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Tcp_Broken(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer ls.Close()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "tests/client.php", "broken", "tcp")
+
+ w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ go func() {
+ err := w.Wait()
+
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "undefined_function()")
+ }()
+ defer w.Stop()
+
+ res, err := w.Exec(&Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res)
+}
+
+func Test_Tcp_Echo(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer ls.Close()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+
+ w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer w.Stop()
+
+ res, err := w.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Head)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+//todo: test relay timeout
+//todo: test dead workers
+
+//func Test_Tcp_Errored(t *testing.T) {
+// defer time.Sleep(time.Millisecond * 10) // free socket
+//
+// ls, err := net.Listen("tcp", "localhost:9007")
+// if assert.NoError(t, err) {
+// defer ls.Destroy()
+// } else {
+// t.Skip("socket is busy")
+// }
+//
+// cmd := exec.Command("php", "tests/invalid.php")
+// w, err := NewSocketFactory(ls).SpawnWorker(cmd)
+// assert.Nil(t, w)
+// assert.NotNil(t, err)
+//
+// assert.Equal(t, "unable to connect to worker: worker is gone", err.Error())
+//}
+//
+//func Test_Tcp_Broken(t *testing.T) {
+// defer time.Sleep(time.Millisecond * 10) // free socket
+//
+// ls, err := net.Listen("tcp", "localhost:9007")
+// if assert.NoError(t, err) {
+// defer ls.Destroy()
+// } else {
+// t.Skip("socket is busy")
+// }
+//
+// cmd := exec.Command("php", "tests/client.php", "broken", "tcp")
+// w, err := NewSocketFactory(ls).SpawnWorker(cmd)
+// defer w.Destroy()
+//
+// r, ctx, err := w.Exec([]byte("hello"), nil)
+// assert.Nil(t, r)
+// assert.NotNil(t, err)
+// assert.Nil(t, ctx)
+//
+// assert.IsType(t, WorkerError(errors.New("")), err)
+// assert.Contains(t, err.Error(), "undefined_function()")
+//}
+//
+
+func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if err == nil {
+ defer ls.Close()
+ } else {
+ b.Skip("socket is busy")
+ }
+
+ f := NewSocketFactory(ls, time.Minute)
+ for n := 0; n < b.N; n++ {
+ cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+
+ w, _ := f.SpawnWorker(cmd)
+ go func() {
+ if w.Wait() != nil {
+ b.Fail()
+ }
+ }()
+
+ w.Stop()
+ }
+}
+
+func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if err == nil {
+ defer ls.Close()
+ } else {
+ b.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+
+ w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ go func() {
+ w.Wait()
+ }()
+ defer w.Stop()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
+ if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
+ b.Skip("not supported on " + runtime.GOOS)
+ }
+
+ ls, err := net.Listen("unix", "sock.unix")
+ if err == nil {
+ defer ls.Close()
+ } else {
+ b.Skip("socket is busy")
+ }
+
+ f := NewSocketFactory(ls, time.Minute)
+ for n := 0; n < b.N; n++ {
+ cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+
+ w, _ := f.SpawnWorker(cmd)
+ go func() {
+ if w.Wait() != nil {
+ b.Fail()
+ }
+ }()
+
+ w.Stop()
+ }
+}
+
+func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
+ if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
+ b.Skip("not supported on " + runtime.GOOS)
+ }
+
+ ls, err := net.Listen("unix", "sock.unix")
+ if err == nil {
+ defer ls.Close()
+ } else {
+ b.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+
+ w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ go func() {
+ w.Wait()
+ }()
+ defer w.Stop()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
diff --git a/source/Worker.php b/source/Worker.php
index d31b48bd..57e8fbf5 100644
--- a/source/Worker.php
+++ b/source/Worker.php
@@ -87,6 +87,7 @@ class Worker
$this->relay->send($context, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW);
}
+ //todo: null payload?
$this->relay->send($payload, Relay::PAYLOAD_RAW);
}
@@ -136,26 +137,23 @@ class Worker
return true;
}
- $parsed = json_decode($body, true);
- if ($parsed === false) {
+ $p = json_decode($body, true);
+ if ($p === false) {
throw new RoadRunnerException("invalid task context, JSON payload is expected");
}
// PID negotiation (socket connections only)
- if (!empty($parsed['pid'])) {
- $this->relay->send(json_encode([
- 'pid' => getmypid(),
- 'parent' => $parsed['pid'],
- ]), Relay::PAYLOAD_CONTROL);
+ if (!empty($p['pid'])) {
+ $this->relay->send(sprintf('{"pid":%s}', getmypid()), Relay::PAYLOAD_CONTROL);
}
// termination request
- if (!empty($parsed['terminate'])) {
+ if (!empty($p['stop'])) {
return false;
}
// not a command but execution context
- $context = $parsed;
+ $context = $p;
return true;
}
diff --git a/state.go b/state.go
index c02ae7e7..2918f396 100644
--- a/state.go
+++ b/state.go
@@ -1,39 +1,101 @@
package roadrunner
-// State is current state int.
-type State int
+import (
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// State represents worker status and updated time.
+type State interface {
+ // Value returns state value
+ Value() int64 //todo: change to state value
+
+ // NumExecs shows how many times worker was invoked
+ NumExecs() uint64
+
+ // Updated indicates a moment updated last state change
+ Updated() time.Time
+}
const (
// StateInactive - no associated process
- StateInactive State = iota
- // StateBooting - relay attached but w.Start() not executed
- StateBooting
+ StateInactive int64 = iota
// StateReady - ready for job.
StateReady
// StateWorking - working on given payload.
StateWorking
// StateStopped - process has been terminated
StateStopped
- // StateError - error State (can't be used)
- StateError
+ // StateErrored - error state (can't be used)
+ StateErrored
)
+type state struct {
+ mu sync.RWMutex
+ value int64
+ numExecs uint64
+ updated time.Time
+}
+
+func newState(value int64) *state {
+ return &state{value: value, updated: time.Now()}
+}
+
// String returns current state as string.
-func (s State) String() string {
- switch s {
+func (s *state) String() string {
+ switch s.value {
case StateInactive:
return "inactive"
- case StateBooting:
- return "booting"
case StateReady:
return "ready"
case StateWorking:
return "working"
case StateStopped:
return "stopped"
- case StateError:
- return "error"
+ case StateErrored:
+ return "errored"
}
return "undefined"
}
+
+// Value state returns state value
+func (s *state) Value() int64 {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ return s.value
+}
+
+// IsActive returns true if worker not Inactive or Stopped
+func (s *state) IsActive() bool {
+ state := s.Value()
+ return state == StateWorking || state == StateReady
+}
+
+// Updated indicates a moment updated last state change
+func (s *state) Updated() time.Time {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ return s.updated
+}
+
+func (s *state) NumExecs() uint64 {
+ return atomic.LoadUint64(&s.numExecs)
+}
+
+// change state value (status)
+func (s *state) set(value int64) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ s.value = value
+ s.updated = time.Now()
+}
+
+// register new execution atomically
+func (s *state) registerExec() {
+ atomic.AddUint64(&s.numExecs, 1)
+}
diff --git a/state_test.go b/state_test.go
new file mode 100644
index 00000000..965ba6b6
--- /dev/null
+++ b/state_test.go
@@ -0,0 +1,21 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_NewState(t *testing.T) {
+ st := newState(StateErrored)
+
+ assert.Equal(t, "errored", st.String())
+ assert.NotEqual(t, 0, st.Updated().Unix())
+}
+
+func Test_IsActive(t *testing.T) {
+ assert.False(t, newState(StateInactive).IsActive())
+ assert.True(t, newState(StateReady).IsActive())
+ assert.True(t, newState(StateWorking).IsActive())
+ assert.False(t, newState(StateStopped).IsActive())
+ assert.False(t, newState(StateErrored).IsActive())
+}
diff --git a/tests/broken-client.php b/tests/broken-client.php
deleted file mode 100644
index ed5bde20..00000000
--- a/tests/broken-client.php
+++ /dev/null
@@ -1,17 +0,0 @@
-<?php
-
-use Spiral\Goridge;
-use Spiral\RoadRunner;
-
-/**
- * echo client over pipes.
- */
-ini_set('display_errors', 'stderr');
-require "vendor/autoload.php";
-
-$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
-
-while ($in = $rr->receive($ctx)) {
- echo undefined_function();
- $rr->send((string)$in);
-} \ No newline at end of file
diff --git a/tests/broken.php b/tests/broken.php
new file mode 100644
index 00000000..b1a3839e
--- /dev/null
+++ b/tests/broken.php
@@ -0,0 +1,14 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+$rr = new RoadRunner\Worker($relay);
+
+while ($in = $rr->receive($ctx)) {
+ echo undefined_function();
+ $rr->send((string)$in);
+} \ No newline at end of file
diff --git a/tests/client.php b/tests/client.php
new file mode 100644
index 00000000..1f1d21b1
--- /dev/null
+++ b/tests/client.php
@@ -0,0 +1,36 @@
+<?php
+
+use Spiral\Goridge;
+
+ini_set('display_errors', 'stderr');
+require dirname(__DIR__) . "/vendor/autoload.php";
+
+if (count($argv) < 3) {
+ die("need 2 arguments");
+}
+
+list($test, $goridge) = [$argv[1], $argv[2]];
+
+switch ($goridge) {
+ case "pipes":
+ $relay = new Goridge\StreamRelay(STDIN, STDOUT);
+ break;
+
+ case "tcp":
+ $relay = new Goridge\SocketRelay("localhost", 9007);
+ break;
+
+ case "unix":
+ $relay = new Goridge\SocketRelay(
+ "sock.unix",
+ null,
+ Goridge\SocketRelay::SOCK_UNIX
+ );
+
+ break;
+
+ default:
+ die("invalid protocol selection");
+}
+
+require_once sprintf("%s/%s.php", __DIR__, $test); \ No newline at end of file
diff --git a/tests/delay.php b/tests/delay.php
new file mode 100644
index 00000000..bfde2fc4
--- /dev/null
+++ b/tests/delay.php
@@ -0,0 +1,18 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+$rr = new RoadRunner\Worker($relay);
+
+while ($in = $rr->receive($ctx)) {
+ try {
+ usleep($in * 1000);
+ $rr->send('');
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+} \ No newline at end of file
diff --git a/tests/echo-client.php b/tests/echo.php
index 22761862..ba58ff30 100644
--- a/tests/echo-client.php
+++ b/tests/echo.php
@@ -1,15 +1,12 @@
<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
use Spiral\Goridge;
use Spiral\RoadRunner;
-/**
- * echo client over pipes.
- */
-ini_set('display_errors', 'stderr');
-require "vendor/autoload.php";
-
-$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$rr = new RoadRunner\Worker($relay);
while ($in = $rr->receive($ctx)) {
try {
diff --git a/tests/error-client.php b/tests/error-client.php
deleted file mode 100644
index 113d1197..00000000
--- a/tests/error-client.php
+++ /dev/null
@@ -1,16 +0,0 @@
-<?php
-
-use Spiral\Goridge;
-use Spiral\RoadRunner;
-
-/**
- * echo client over pipes.
- */
-ini_set('display_errors', 'stderr');
-require "vendor/autoload.php";
-
-$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
-
-while ($in = $rr->receive($ctx)) {
- $rr->error((string)$in);
-} \ No newline at end of file
diff --git a/tests/error.php b/tests/error.php
new file mode 100644
index 00000000..ebd3418b
--- /dev/null
+++ b/tests/error.php
@@ -0,0 +1,13 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+$rr = new RoadRunner\Worker($relay);
+
+while ($in = $rr->receive($ctx)) {
+ $rr->error((string)$in);
+} \ No newline at end of file
diff --git a/tests/failboot.php b/tests/failboot.php
new file mode 100644
index 00000000..fa8b96f6
--- /dev/null
+++ b/tests/failboot.php
@@ -0,0 +1,3 @@
+<?php
+ini_set('display_errors', 'stderr');
+throw new Error("failboot error"); \ No newline at end of file
diff --git a/tests/pid.php b/tests/pid.php
new file mode 100644
index 00000000..a8cfa229
--- /dev/null
+++ b/tests/pid.php
@@ -0,0 +1,17 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+$rr = new RoadRunner\Worker($relay);
+
+while ($in = $rr->receive($ctx)) {
+ try {
+ $rr->send((string)getmypid());
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+} \ No newline at end of file
diff --git a/tests/slow-client.php b/tests/slow-client.php
new file mode 100644
index 00000000..f09142b5
--- /dev/null
+++ b/tests/slow-client.php
@@ -0,0 +1,38 @@
+<?php
+
+use Spiral\Goridge;
+
+ini_set('display_errors', 'stderr');
+require dirname(__DIR__) . "/vendor/autoload.php";
+
+if (count($argv) < 3) {
+ die("need 2 arguments");
+}
+
+list($test, $goridge, $bootDelay, $shutdownDelay) = [$argv[1], $argv[2], $argv[3], $argv[4]];
+
+switch ($goridge) {
+ case "pipes":
+ $relay = new Goridge\StreamRelay(STDIN, STDOUT);
+ break;
+
+ case "tcp":
+ $relay = new Goridge\SocketRelay("localhost", 9007);
+ break;
+
+ case "unix":
+ $relay = new Goridge\SocketRelay(
+ "sock.unix",
+ null,
+ Goridge\SocketRelay::SOCK_UNIX
+ );
+
+ break;
+
+ default:
+ die("invalid protocol selection");
+}
+
+usleep($bootDelay * 1000);
+require_once sprintf("%s/%s.php", __DIR__, $test);
+usleep($shutdownDelay * 1000); \ No newline at end of file
diff --git a/worker.go b/worker.go
index 8960b3fa..aee28c1e 100644
--- a/worker.go
+++ b/worker.go
@@ -2,167 +2,233 @@ package roadrunner
import (
"bytes"
- "encoding/json"
"fmt"
"github.com/spiral/goridge"
- "io"
+ "os"
"os/exec"
"strconv"
"strings"
"sync"
- "sync/atomic"
- "time"
+ "github.com/pkg/errors"
)
// Worker - supervised process with api over goridge.Relay.
type Worker struct {
- // State current worker state.
- State State
+ // Pid of the process, points to Pid of underlying process and
+ // can be nil while process is not started.
+ Pid *int
- // Last time worker State has changed
- Last time.Time
+ // state holds information about current worker state,
+ // number of worker executions, last status change time.
+ // publicly this object is read-only and protected using Mutex
+ // and atomic counter.
+ state *state
- // NumExecutions how many times worker have been invoked.
- NumExecutions uint64
+ // underlying command with associated process, command must be
+ // provided to worker from outside in non-started form. Cmd
+ // stdErr pipe will be handled by worker to aggregate error message.
+ cmd *exec.Cmd
- // Pid contains process ID and empty until worker is started.
- Pid *int
+ // err aggregates stderr output from underlying process. Value can be
+ // read only once command is completed and all pipes are closed.
+ err *bytes.Buffer
+
+ // channel is being closed once command is complete.
+ waitDone chan interface{}
+
+ // contains information about resulted process state.
+ endState *os.ProcessState
- cmd *exec.Cmd // underlying command process
- err *bytes.Buffer // aggregates stderr
- rl goridge.Relay // communication bus with underlying process
- mu sync.RWMutex // ensures than only one execution can be run at once
+ // ensures than only one execution can be run at once.
+ mu sync.Mutex
+
+ // communication bus with underlying process.
+ rl goridge.Relay
}
-// NewWorker creates new worker
-func NewWorker(cmd *exec.Cmd) (*Worker, error) {
- w := &Worker{
- cmd: cmd,
- err: bytes.NewBuffer(nil),
- State: StateInactive,
+// newWorker creates new worker over given exec.cmd.
+func newWorker(cmd *exec.Cmd) (*Worker, error) {
+ if cmd.Process != nil {
+ return nil, fmt.Errorf("can't attach to running process")
}
- if w.cmd.Process != nil {
- return nil, fmt.Errorf("can't attach to running process")
+ w := &Worker{
+ cmd: cmd,
+ err: new(bytes.Buffer),
+ waitDone: make(chan interface{}),
+ state: newState(StateInactive),
}
+ // piping all stderr to command buffer
+ w.cmd.Stderr = w.err
+
return w, nil
}
+// State return read-only worker state object, state can be used to safely access
+// worker status, time when status changed and number of worker executions.
+func (w *Worker) State() State {
+ return w.state
+}
+
// String returns worker description.
func (w *Worker) String() string {
- state := w.State.String()
-
+ state := w.state.String()
if w.Pid != nil {
- state = state + ", pid:" + strconv.Itoa(*w.Pid)
+ state = state + ", pid.php:" + strconv.Itoa(*w.Pid)
}
- return fmt.Sprintf("(`%s` [%s], execs: %v)", strings.Join(w.cmd.Args, " "), state, w.NumExecutions)
+ return fmt.Sprintf(
+ "(`%s` [%s], numExecs: %v)",
+ strings.Join(w.cmd.Args, " "),
+ state,
+ w.state.NumExecs(),
+ )
}
// Start underlying process or return error
func (w *Worker) Start() error {
- stderr, err := w.cmd.StderrPipe()
- if err != nil {
- w.setState(StateError)
- return err
+ if w.cmd.Process != nil {
+ return fmt.Errorf("process already running")
}
- // copying all process errors into buffer space
- go io.Copy(w.err, stderr)
-
if err := w.cmd.Start(); err != nil {
- w.setState(StateError)
- return w.mockError(err)
+ close(w.waitDone)
+
+ return err
}
- w.setState(StateReady)
+ w.Pid = &w.cmd.Process.Pid
+
+ // relays for process to complete
+ go func() {
+ w.endState, _ = w.cmd.Process.Wait()
+ if w.waitDone != nil {
+ w.state.set(StateStopped)
+ close(w.waitDone)
+
+ if w.rl != nil {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ w.rl.Close()
+ }
+ }
+ }()
return nil
}
-// Execute command and return result and result context.
-func (w *Worker) Execute(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
- w.mu.Lock()
- defer w.mu.Unlock()
+// Wait must be called once for each worker, call will be released once worker is
+// complete and will return process error (if any), if stderr is presented it's value
+// will be wrapped as WorkerError. Method will return error code if php process fails
+// to find or start the script.
+func (w *Worker) Wait() error {
+ <-w.waitDone
- if w.State != StateReady {
- return nil, nil, fmt.Errorf("worker must be in state `waiting` (`%s` given)", w.State)
- }
+ // ensure that all pipe descriptors are closed
+ w.cmd.Wait()
- w.setState(StateReady)
- atomic.AddUint64(&w.NumExecutions, 1)
+ if w.endState.Success() {
+ return nil
+ }
- if ctx != nil {
- if data, err := json.Marshal(ctx); err == nil {
- w.rl.Send(data, goridge.PayloadControl)
- } else {
- return nil, nil, fmt.Errorf("invalid context: %s", err)
- }
- } else {
- w.rl.Send(nil, goridge.PayloadControl|goridge.PayloadEmpty)
+ if w.err.Len() != 0 {
+ return errors.New(w.err.String())
}
- w.rl.Send(body, 0)
+ // generic process error
+ return &exec.ExitError{ProcessState: w.endState}
+}
- rCtx, p, err := w.rl.Receive()
+// Destroy sends soft termination command to the worker to properly stop the process.
+func (w *Worker) Stop() error {
+ select {
+ case <-w.waitDone:
+ return nil
+ default:
+ w.mu.Lock()
+ defer w.mu.Unlock()
- if !p.HasFlag(goridge.PayloadControl) {
- return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)"))
- }
+ w.state.set(StateInactive)
+ err := sendHead(w.rl, &stopCommand{Stop: true})
- if p.HasFlag(goridge.PayloadError) {
- w.setState(StateReady)
- return nil, nil, JobError(rCtx)
+ <-w.waitDone
+ return err
}
+}
- if resp, p, err = w.rl.Receive(); err != nil {
- w.setState(StateError)
- return nil, nil, w.mockError(fmt.Errorf("worker error: %s", err))
- }
+// Kill kills underlying process, make sure to call Wait() func to gather
+// error log from the stderr
+func (w *Worker) Kill() error {
+ select {
+ case <-w.waitDone:
+ return nil
+ default:
+ w.mu.Lock()
+ defer w.mu.Unlock()
- w.setState(StateReady)
- return resp, rCtx, nil
+ w.state.set(StateInactive)
+ err := w.cmd.Process.Kill()
+
+ <-w.waitDone
+ return err
+ }
}
-// Stop underlying process or return error.
-func (w *Worker) Stop() {
+func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
w.mu.Lock()
defer w.mu.Unlock()
- w.setState(StateInactive)
+ if rqs == nil {
+ return nil, fmt.Errorf("payload can not be empty")
+ }
- go func() {
- sendCommand(w.rl, &TerminateCommand{Terminate: true})
- }()
+ if w.state.Value() != StateReady {
+ return nil, fmt.Errorf("worker is not ready (%s)", w.state.Value())
+ }
- w.cmd.Wait()
- w.rl.Close()
+ w.state.set(StateWorking)
+ defer w.state.registerExec()
- w.setState(StateStopped)
-}
+ rsp, err = w.execPayload(rqs)
-// attach payload/control relay to the worker.
-func (w *Worker) attach(rl goridge.Relay) {
- w.mu.Lock()
- defer w.mu.Unlock()
+ if err != nil {
+ if _, ok := err.(JobError); !ok {
+ w.state.set(StateErrored)
+ return nil, err
+ }
+ }
- w.rl = rl
- w.setState(StateBooting)
+ w.state.set(StateReady)
+ return rsp, err
}
-// sets worker State and it's context (non blocking!).
-func (w *Worker) setState(state State) {
- // safer?
- w.State = state
- w.Last = time.Now()
-}
+func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) {
+ if err := sendHead(w.rl, rqs.Head); err != nil {
+ return nil, errors.Wrap(err, "header error")
+ }
-// mockError attaches worker specific error (from stderr) to parent error
-func (w *Worker) mockError(err error) WorkerError {
- if w.err.Len() != 0 {
- return WorkerError(w.err.String())
+ w.rl.Send(rqs.Body, 0)
+
+ var pr goridge.Prefix
+ rsp = new(Payload)
+
+ if rsp.Head, pr, err = w.rl.Receive(); err != nil {
+ return nil, errors.Wrap(err, "worker error")
+ }
+
+ if !pr.HasFlag(goridge.PayloadControl) {
+ return nil, fmt.Errorf("mailformed worker response")
+ }
+
+ if pr.HasFlag(goridge.PayloadError) {
+ return nil, JobError(rsp.Head)
+ }
+
+ if rsp.Body, pr, err = w.rl.Receive(); err != nil {
+ return nil, errors.Wrap(err, "worker error")
}
- return WorkerError(err.Error())
+ return rsp, nil
}
diff --git a/worker_test.go b/worker_test.go
index d4d24364..0cb6f2d4 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -1,130 +1,141 @@
package roadrunner
import (
- "github.com/spiral/goridge"
"github.com/stretchr/testify/assert"
- "io"
"os/exec"
"testing"
"time"
)
-func getPipes(cmd *exec.Cmd) (io.ReadCloser, io.WriteCloser) {
- in, err := cmd.StdoutPipe()
- if err != nil {
- panic(err)
- }
+func Test_GetState(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- out, err := cmd.StdinPipe()
- if err != nil {
- panic(err)
- }
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
- return in, out
-}
-
-func TestOnStarted(t *testing.T) {
- pr := exec.Command("php", "tests/echo-client.php")
- pr.Start()
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
- _, err := NewWorker(pr)
- assert.NotNil(t, err)
- assert.Equal(t, "can't attach to running process", err.Error())
+ assert.Equal(t, StateReady, w.State().Value())
+ w.Stop()
+ assert.Equal(t, StateStopped, w.State().Value())
}
-func TestNewWorkerState(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/echo-client.php"))
- assert.Nil(t, err)
- assert.Equal(t, StateInactive, w.State)
+func Test_Echo(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Equal(t, StateBooting, w.State)
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer w.Stop()
- assert.Nil(t, w.Start())
- assert.Equal(t, StateReady, w.State)
-}
+ res, err := w.Exec(&Payload{Body: []byte("hello")})
-func TestStop(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/echo-client.php"))
assert.Nil(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Head)
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
-
- w.Stop()
- assert.Equal(t, StateStopped, w.State)
+ assert.Equal(t, "hello", res.String())
}
-func TestEcho(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/echo-client.php"))
- assert.Nil(t, err)
+func Test_Echo_Slow(t *testing.T) {
+ cmd := exec.Command("php", "tests/slow-client.php", "echo", "pipes", "10", "10")
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer w.Stop()
+
+ res, err := w.Exec(&Payload{Body: []byte("hello")})
- r, ctx, err := w.Execute([]byte("hello"), nil)
assert.Nil(t, err)
- assert.Nil(t, ctx)
- assert.Equal(t, "hello", string(r))
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Head)
+
+ assert.Equal(t, "hello", res.String())
}
-func TestError(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/error-client.php"))
- assert.Nil(t, err)
+func Test_Broken(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ err := w.Wait()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "undefined_function()")
+ }()
+ defer w.Stop()
- r, ctx, err := w.Execute([]byte("hello"), nil)
- assert.Nil(t, r)
+ res, err := w.Exec(&Payload{Body: []byte("hello")})
+ assert.Nil(t, res)
assert.NotNil(t, err)
- assert.Nil(t, ctx)
+}
- assert.IsType(t, JobError{}, err)
- assert.Equal(t, "hello", err.Error())
+func Test_OnStarted(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ assert.Nil(t, cmd.Start())
+
+ w, err := newWorker(cmd)
+ assert.Nil(t, w)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, "can't attach to running process", err.Error())
}
-func TestBroken(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/broken-client.php"))
- assert.Nil(t, err)
+func Test_Error(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "error", "pipes")
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer w.Stop()
- r, ctx, err := w.Execute([]byte("hello"), nil)
- assert.Nil(t, r)
+ res, err := w.Exec(&Payload{Body: []byte("hello")})
+ assert.Nil(t, res)
assert.NotNil(t, err)
- assert.Nil(t, ctx)
- assert.IsType(t, WorkerError(""), err)
- assert.Contains(t, err.Error(), "undefined_function()")
+ assert.IsType(t, JobError{}, err)
+ assert.Equal(t, "hello", err.Error())
}
-func TestNumExecutions(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/echo-client.php"))
- assert.Nil(t, err)
+func Test_NumExecs(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer w.Stop()
- w.Execute([]byte("hello"), nil)
- assert.Equal(t, uint64(1), w.NumExecutions)
+ w.Exec(&Payload{Body: []byte("hello")})
+ assert.Equal(t, uint64(1), w.State().NumExecs())
- w.Execute([]byte("hello"), nil)
- assert.Equal(t, uint64(2), w.NumExecutions)
+ w.Exec(&Payload{Body: []byte("hello")})
+ assert.Equal(t, uint64(2), w.State().NumExecs())
- w.Execute([]byte("hello"), nil)
- assert.Equal(t, uint64(3), w.NumExecutions)
+ w.Exec(&Payload{Body: []byte("hello")})
+ assert.Equal(t, uint64(3), w.State().NumExecs())
}
-func TestLastExecution(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/echo-client.php"))
- assert.Nil(t, err)
+func Test_StateUpdated(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer w.Stop()
tm := time.Now()
- w.Execute([]byte("hello"), nil)
- assert.True(t, w.Last.After(tm))
+ time.Sleep(time.Millisecond)
+
+ w.Exec(&Payload{Body: []byte("hello")})
+ assert.True(t, w.State().Updated().After(tm))
}