summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-27 16:15:11 +0300
committerGitHub <[email protected]>2020-10-27 16:15:11 +0300
commit47a570c220a36ae7b770ea594a41637fa31fc8e8 (patch)
treedb4c7e348c6b03c478a907cf89c8baaf68195d73
parent105bde0e0c1a7c133d1daa10603ca5ce9a9ade4d (diff)
parent27392d90f3b208adc5215ce876698650a4188463 (diff)
Merge pull request #377 from spiral/feature/pool_supervisor
Feature/pool supervisor
-rwxr-xr-x.github/workflows/ci-build.yml2
-rwxr-xr-xcomposer.lock167
-rwxr-xr-xerrors/errors.go16
-rwxr-xr-xerrors/errors_test.go177
-rwxr-xr-xerrors/go.mod3
-rwxr-xr-xerrors/marshal.go1
-rwxr-xr-xpipe_factory.go5
-rwxr-xr-xplugins/factory/tests/plugin_2.go4
-rwxr-xr-xpool.go18
-rwxr-xr-xsocket_factory.go4
-rwxr-xr-xstate.go1
-rwxr-xr-xstatic_pool.go161
-rwxr-xr-xstatic_pool_test.go2
-rwxr-xr-xsupervisor_pool.go94
-rw-r--r--supervisor_test.go150
-rwxr-xr-xsync_worker.go87
-rw-r--r--tests/memleak.php15
-rw-r--r--tests/sleep.php15
-rwxr-xr-xworker.go4
-rwxr-xr-xworker_test.go2
-rwxr-xr-xworker_watcher.go62
21 files changed, 593 insertions, 397 deletions
diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml
index 585858c1..94549c37 100755
--- a/.github/workflows/ci-build.yml
+++ b/.github/workflows/ci-build.yml
@@ -21,7 +21,7 @@ jobs:
go-version: ${{ matrix.go }}
- name: Set up PHP ${{ matrix.php }}
- uses: shivammathur/setup-php@v1
+ uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php }}
extensions: dom
diff --git a/composer.lock b/composer.lock
index 7a1094b1..13e6af6b 100755
--- a/composer.lock
+++ b/composer.lock
@@ -89,6 +89,14 @@
"psr-17",
"psr-7"
],
+ "support": {
+ "chat": "https://laminas.dev/chat",
+ "docs": "https://docs.laminas.dev/laminas-diactoros/",
+ "forum": "https://discourse.laminas.dev",
+ "issues": "https://github.com/laminas/laminas-diactoros/issues",
+ "rss": "https://github.com/laminas/laminas-diactoros/releases.atom",
+ "source": "https://github.com/laminas/laminas-diactoros"
+ },
"funding": [
{
"url": "https://funding.communitybridge.org/projects/laminas-project",
@@ -143,6 +151,12 @@
"laminas",
"zf"
],
+ "support": {
+ "forum": "https://discourse.laminas.dev/",
+ "issues": "https://github.com/laminas/laminas-zendframework-bridge/issues",
+ "rss": "https://github.com/laminas/laminas-zendframework-bridge/releases.atom",
+ "source": "https://github.com/laminas/laminas-zendframework-bridge"
+ },
"funding": [
{
"url": "https://funding.communitybridge.org/projects/laminas-project",
@@ -198,6 +212,10 @@
"container-interop",
"psr"
],
+ "support": {
+ "issues": "https://github.com/php-fig/container/issues",
+ "source": "https://github.com/php-fig/container/tree/master"
+ },
"time": "2017-02-14T16:28:37+00:00"
},
{
@@ -250,6 +268,9 @@
"request",
"response"
],
+ "support": {
+ "source": "https://github.com/php-fig/http-factory/tree/master"
+ },
"time": "2019-04-30T12:38:16+00:00"
},
{
@@ -300,6 +321,9 @@
"request",
"response"
],
+ "support": {
+ "source": "https://github.com/php-fig/http-message/tree/master"
+ },
"time": "2016-08-06T14:39:51+00:00"
},
{
@@ -345,20 +369,24 @@
}
],
"description": "High-performance PHP-to-Golang RPC bridge",
+ "support": {
+ "issues": "https://github.com/spiral/goridge-php/issues",
+ "source": "https://github.com/spiral/goridge-php/tree/v2.4.5"
+ },
"time": "2020-08-14T14:28:30+00:00"
},
{
"name": "symfony/console",
- "version": "v5.1.6",
+ "version": "v5.1.7",
"source": {
"type": "git",
"url": "https://github.com/symfony/console.git",
- "reference": "04c3a31fe8ea94b42c9e2d1acc93d19782133b00"
+ "reference": "ae789a8a2ad189ce7e8216942cdb9b77319f5eb8"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/console/zipball/04c3a31fe8ea94b42c9e2d1acc93d19782133b00",
- "reference": "04c3a31fe8ea94b42c9e2d1acc93d19782133b00",
+ "url": "https://api.github.com/repos/symfony/console/zipball/ae789a8a2ad189ce7e8216942cdb9b77319f5eb8",
+ "reference": "ae789a8a2ad189ce7e8216942cdb9b77319f5eb8",
"shasum": ""
},
"require": {
@@ -424,6 +452,9 @@
],
"description": "Symfony Console Component",
"homepage": "https://symfony.com",
+ "support": {
+ "source": "https://github.com/symfony/console/tree/v5.1.7"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -438,24 +469,24 @@
"type": "tidelift"
}
],
- "time": "2020-09-18T14:27:32+00:00"
+ "time": "2020-10-07T15:23:00+00:00"
},
{
"name": "symfony/polyfill-ctype",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-ctype.git",
- "reference": "1c302646f6efc070cd46856e600e5e0684d6b454"
+ "reference": "f4ba089a5b6366e453971d3aad5fe8e897b37f41"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/1c302646f6efc070cd46856e600e5e0684d6b454",
- "reference": "1c302646f6efc070cd46856e600e5e0684d6b454",
+ "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/f4ba089a5b6366e453971d3aad5fe8e897b37f41",
+ "reference": "f4ba089a5b6366e453971d3aad5fe8e897b37f41",
"shasum": ""
},
"require": {
- "php": ">=5.3.3"
+ "php": ">=7.1"
},
"suggest": {
"ext-ctype": "For best performance"
@@ -463,7 +494,7 @@
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -500,6 +531,9 @@
"polyfill",
"portable"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-ctype/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -514,24 +548,24 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/polyfill-intl-grapheme",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-intl-grapheme.git",
- "reference": "b740103edbdcc39602239ee8860f0f45a8eb9aa5"
+ "reference": "c7cf3f858ec7d70b89559d6e6eb1f7c2517d479c"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-intl-grapheme/zipball/b740103edbdcc39602239ee8860f0f45a8eb9aa5",
- "reference": "b740103edbdcc39602239ee8860f0f45a8eb9aa5",
+ "url": "https://api.github.com/repos/symfony/polyfill-intl-grapheme/zipball/c7cf3f858ec7d70b89559d6e6eb1f7c2517d479c",
+ "reference": "c7cf3f858ec7d70b89559d6e6eb1f7c2517d479c",
"shasum": ""
},
"require": {
- "php": ">=5.3.3"
+ "php": ">=7.1"
},
"suggest": {
"ext-intl": "For best performance"
@@ -539,7 +573,7 @@
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -578,6 +612,9 @@
"portable",
"shim"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-intl-grapheme/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -592,24 +629,24 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/polyfill-intl-normalizer",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-intl-normalizer.git",
- "reference": "37078a8dd4a2a1e9ab0231af7c6cb671b2ed5a7e"
+ "reference": "727d1096295d807c309fb01a851577302394c897"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-intl-normalizer/zipball/37078a8dd4a2a1e9ab0231af7c6cb671b2ed5a7e",
- "reference": "37078a8dd4a2a1e9ab0231af7c6cb671b2ed5a7e",
+ "url": "https://api.github.com/repos/symfony/polyfill-intl-normalizer/zipball/727d1096295d807c309fb01a851577302394c897",
+ "reference": "727d1096295d807c309fb01a851577302394c897",
"shasum": ""
},
"require": {
- "php": ">=5.3.3"
+ "php": ">=7.1"
},
"suggest": {
"ext-intl": "For best performance"
@@ -617,7 +654,7 @@
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -659,6 +696,9 @@
"portable",
"shim"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-intl-normalizer/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -673,24 +713,24 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/polyfill-mbstring",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-mbstring.git",
- "reference": "a6977d63bf9a0ad4c65cd352709e230876f9904a"
+ "reference": "39d483bdf39be819deabf04ec872eb0b2410b531"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/a6977d63bf9a0ad4c65cd352709e230876f9904a",
- "reference": "a6977d63bf9a0ad4c65cd352709e230876f9904a",
+ "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/39d483bdf39be819deabf04ec872eb0b2410b531",
+ "reference": "39d483bdf39be819deabf04ec872eb0b2410b531",
"shasum": ""
},
"require": {
- "php": ">=5.3.3"
+ "php": ">=7.1"
},
"suggest": {
"ext-mbstring": "For best performance"
@@ -698,7 +738,7 @@
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -736,6 +776,9 @@
"portable",
"shim"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-mbstring/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -750,29 +793,29 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/polyfill-php73",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-php73.git",
- "reference": "fffa1a52a023e782cdcc221d781fe1ec8f87fcca"
+ "reference": "8ff431c517be11c78c48a39a66d37431e26a6bed"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-php73/zipball/fffa1a52a023e782cdcc221d781fe1ec8f87fcca",
- "reference": "fffa1a52a023e782cdcc221d781fe1ec8f87fcca",
+ "url": "https://api.github.com/repos/symfony/polyfill-php73/zipball/8ff431c517be11c78c48a39a66d37431e26a6bed",
+ "reference": "8ff431c517be11c78c48a39a66d37431e26a6bed",
"shasum": ""
},
"require": {
- "php": ">=5.3.3"
+ "php": ">=7.1"
},
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -812,6 +855,9 @@
"portable",
"shim"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-php73/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -826,29 +872,29 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/polyfill-php80",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-php80.git",
- "reference": "d87d5766cbf48d72388a9f6b85f280c8ad51f981"
+ "reference": "e70aa8b064c5b72d3df2abd5ab1e90464ad009de"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-php80/zipball/d87d5766cbf48d72388a9f6b85f280c8ad51f981",
- "reference": "d87d5766cbf48d72388a9f6b85f280c8ad51f981",
+ "url": "https://api.github.com/repos/symfony/polyfill-php80/zipball/e70aa8b064c5b72d3df2abd5ab1e90464ad009de",
+ "reference": "e70aa8b064c5b72d3df2abd5ab1e90464ad009de",
"shasum": ""
},
"require": {
- "php": ">=7.0.8"
+ "php": ">=7.1"
},
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -892,6 +938,9 @@
"portable",
"shim"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-php80/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -906,7 +955,7 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/service-contracts",
@@ -968,6 +1017,9 @@
"interoperability",
"standards"
],
+ "support": {
+ "source": "https://github.com/symfony/service-contracts/tree/master"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -986,7 +1038,7 @@
},
{
"name": "symfony/string",
- "version": "v5.1.6",
+ "version": "v5.1.7",
"source": {
"type": "git",
"url": "https://github.com/symfony/string.git",
@@ -1053,6 +1105,9 @@
"utf-8",
"utf8"
],
+ "support": {
+ "source": "https://github.com/symfony/string/tree/v5.1.7"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -1073,16 +1128,16 @@
"packages-dev": [
{
"name": "phpstan/phpstan",
- "version": "0.12.46",
+ "version": "0.12.52",
"source": {
"type": "git",
"url": "https://github.com/phpstan/phpstan.git",
- "reference": "9419738e20f0c49757be05d22969c1c44c1dff3b"
+ "reference": "e96dd5e7ae9aefed663bc7e285ad96792b67eadc"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/phpstan/phpstan/zipball/9419738e20f0c49757be05d22969c1c44c1dff3b",
- "reference": "9419738e20f0c49757be05d22969c1c44c1dff3b",
+ "url": "https://api.github.com/repos/phpstan/phpstan/zipball/e96dd5e7ae9aefed663bc7e285ad96792b67eadc",
+ "reference": "e96dd5e7ae9aefed663bc7e285ad96792b67eadc",
"shasum": ""
},
"require": {
@@ -1111,6 +1166,10 @@
"MIT"
],
"description": "PHPStan - PHP Static Analysis Tool",
+ "support": {
+ "issues": "https://github.com/phpstan/phpstan/issues",
+ "source": "https://github.com/phpstan/phpstan/tree/0.12.52"
+ },
"funding": [
{
"url": "https://github.com/ondrejmirtes",
@@ -1125,7 +1184,7 @@
"type": "tidelift"
}
],
- "time": "2020-09-28T09:48:55+00:00"
+ "time": "2020-10-25T07:23:44+00:00"
}
],
"aliases": [],
@@ -1139,5 +1198,5 @@
"ext-curl": "*"
},
"platform-dev": [],
- "plugin-api-version": "1.1.0"
+ "plugin-api-version": "2.0.0"
}
diff --git a/errors/errors.go b/errors/errors.go
index def408d8..ec621b67 100755
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -39,22 +39,18 @@ type Kind uint8
// Kinds of errors.
const (
Undefined Kind = iota // Undefined error.
- Network
- Other
- Test
+ ErrWatcherStopped
+ TimeOut
)
func (k Kind) String() string {
switch k {
case Undefined:
return "UNDEF"
- case Network:
- return "Network error"
- case Other:
- return "Other"
- case Test:
- return "Test"
-
+ case ErrWatcherStopped:
+ return "Watcher stopped"
+ case TimeOut:
+ return "TimedOut"
}
return "unknown error kind"
}
diff --git a/errors/errors_test.go b/errors/errors_test.go
deleted file mode 100755
index 50d3d422..00000000
--- a/errors/errors_test.go
+++ /dev/null
@@ -1,177 +0,0 @@
-// +build !debug
-
-package errors
-
-import (
- "fmt"
- "io"
- "os"
- "os/exec"
- "testing"
-)
-
-func TestDebug(t *testing.T) {
- // Test with -tags debug to run the tests in debug_test.go
- cmd := exec.Command("go", "test", "-tags", "prod")
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- if err := cmd.Run(); err != nil {
- t.Fatalf("external go test failed: %v", err)
- }
-}
-
-func TestMarshal(t *testing.T) {
- // Single error. No user is set, so we will have a zero-length field inside.
- e1 := E(Op("Get"), Network, "caching in progress")
-
- // Nested error.
- e2 := E(Op("Read"), Undefined, e1)
-
- b := MarshalError(e2)
- e3 := UnmarshalError(b)
-
- in := e2.(*Error)
- out := e3.(*Error)
-
- // Compare elementwise.
- if in.Op != out.Op {
- t.Errorf("expected Op %q; got %q", in.Op, out.Op)
- }
- if in.Kind != out.Kind {
- t.Errorf("expected kind %d; got %d", in.Kind, out.Kind)
- }
- // Note that error will have lost type information, so just check its Error string.
- if in.Err.Error() != out.Err.Error() {
- t.Errorf("expected Err %q; got %q", in.Err, out.Err)
- }
-}
-
-func TestSeparator(t *testing.T) {
- defer func(prev string) {
- Separator = prev
- }(Separator)
- Separator = ":: "
-
- // Single error. No user is set, so we will have a zero-length field inside.
- e1 := E(Op("Get"), Network, "network error")
-
- // Nested error.
- e2 := E(Op("Get"), Network, e1)
-
- want := "Get: Network error:: Get: network error"
- if errorAsString(e2) != want {
- t.Errorf("expected %q; got %q", want, e2)
- }
-}
-
-func TestDoesNotChangePreviousError(t *testing.T) {
- err := E(Network)
- err2 := E(Op("I will NOT modify err"), err)
-
- expected := "I will NOT modify err: Network error"
- if errorAsString(err2) != expected {
- t.Fatalf("Expected %q, got %q", expected, err2)
- }
- kind := err.(*Error).Kind
- if kind != Network {
- t.Fatalf("Expected kind %v, got %v", Network, kind)
- }
-}
-
-//func TestNoArgs(t *testing.T) {
-// defer func() {
-// err := recover()
-// if err == nil {
-// t.Fatal("E() did not panic")
-// }
-// }()
-// _ = E()
-//}
-
-type matchTest struct {
- err1, err2 error
- matched bool
-}
-
-const (
- op = Op("Op")
- op1 = Op("Op1")
- op2 = Op("Op2")
-)
-
-var matchTests = []matchTest{
- // Errors not of type *Error fail outright.
- {nil, nil, false},
- {io.EOF, io.EOF, false},
- {E(io.EOF), io.EOF, false},
- {io.EOF, E(io.EOF), false},
- // Success. We can drop fields from the first argument and still match.
- {E(io.EOF), E(io.EOF), true},
- {E(op, Other, io.EOF), E(op, Other, io.EOF), true},
- {E(op, Other, io.EOF, "test"), E(op, Other, io.EOF, "test", "test"), true},
- {E(op, Other), E(op, Other, io.EOF, "test", "test"), true},
- {E(op), E(op, Other, io.EOF, "test", "test"), true},
- // Failure.
- {E(io.EOF), E(io.ErrClosedPipe), false},
- {E(op1), E(op2), false},
- {E(Other), E(Network), false},
- {E("test"), E("test1"), false},
- {E(fmt.Errorf("error")), E(fmt.Errorf("error1")), false},
- {E(op, Other, io.EOF, "test", "test1"), E(op, Other, io.EOF, "test", "test"), false},
- {E("test", Str("something")), E("test"), false}, // Test nil error on rhs.
- // Nested *Errors.
- {E(op1, E("test")), E(op1, "1", E(op2, "2", "test")), true},
- {E(op1, "test"), E(op1, "1", E(op2, "2", "test")), false},
- {E(op1, E("test")), E(op1, "1", Str(E(op2, "2", "test").Error())), false},
-}
-
-func TestMatch(t *testing.T) {
- for _, test := range matchTests {
- matched := Match(test.err1, test.err2)
- if matched != test.matched {
- t.Errorf("Match(%q, %q)=%t; want %t", test.err1, test.err2, matched, test.matched)
- }
- }
-}
-
-type kindTest struct {
- err error
- kind Kind
- want bool
-}
-
-var kindTests = []kindTest{
- //Non-Error errors.
- {nil, Network, false},
- {Str("not an *Error"), Network, false},
-
- // Basic comparisons.
- {E(Network), Network, true},
- {E(Test), Network, false},
- {E("no kind"), Network, false},
- {E("no kind"), Other, false},
-
- // Nested *Error values.
- {E("Nesting", E(Network)), Network, true},
- {E("Nesting", E(Test)), Network, false},
- {E("Nesting", E("no kind")), Network, false},
- {E("Nesting", E("no kind")), Other, false},
-}
-
-func TestKind(t *testing.T) {
- for _, test := range kindTests {
- got := Is(test.kind, test.err)
- if got != test.want {
- t.Errorf("Is(%q, %q)=%t; want %t", test.kind, test.err, got, test.want)
- }
- }
-}
-
-func errorAsString(err error) string {
- if e, ok := err.(*Error); ok {
- e2 := *e
- e2.stack = stack{}
- return e2.Error()
- }
- return err.Error()
-}
diff --git a/errors/go.mod b/errors/go.mod
deleted file mode 100755
index 1eaacc23..00000000
--- a/errors/go.mod
+++ /dev/null
@@ -1,3 +0,0 @@
-module github.com/48d90782/errors
-
-go 1.15
diff --git a/errors/marshal.go b/errors/marshal.go
index a13ec01f..7c8a63ef 100755
--- a/errors/marshal.go
+++ b/errors/marshal.go
@@ -35,7 +35,6 @@ func MarshalErrorAppend(err error, b []byte) []byte {
b = append(b, 'e')
b = appendString(b, err.Error())
return b
-
}
func MarshalError(err error) []byte {
diff --git a/pipe_factory.go b/pipe_factory.go
index a6c94614..807d7793 100755
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -84,7 +84,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
}
// todo kill timeout
- errK := w.Kill(ctx)
+ errK := w.Kill()
if errK != nil {
errs = append(errs, fmt.Errorf("error killing the worker with PID number %d, Created: %s", w.Pid(), w.Created()).Error())
}
@@ -164,8 +164,7 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
errs = append(errs, errF.Error())
}
- // todo kill timeout ??
- errK := w.Kill(context.Background())
+ errK := w.Kill()
if errK != nil {
errs = append(errs, fmt.Errorf("error killing the worker with PID number %d, Created: %s", w.Pid(), w.Created()).Error())
}
diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go
index 9f401bec..2311b7bf 100755
--- a/plugins/factory/tests/plugin_2.go
+++ b/plugins/factory/tests/plugin_2.go
@@ -61,11 +61,11 @@ func (f *Foo2) Serve() chan error {
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- Supervisor: roadrunner.SupervisorConfig{
+ Supervisor: &roadrunner.SupervisorConfig{
WatchTick: 60,
TTL: 1000,
IdleTTL: 10,
- ExecTTL: time.Second * 10,
+ ExecTTL: 10,
MaxWorkerMemory: 1000,
},
}
diff --git a/pool.go b/pool.go
index aca1b340..a95b8cfb 100755
--- a/pool.go
+++ b/pool.go
@@ -54,6 +54,8 @@ type Pool interface {
// Exec
Exec(rqs Payload) (Payload, error)
+ ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+
// Workers returns worker list associated with the pool.
Workers() (workers []WorkerBase)
@@ -87,7 +89,7 @@ type Config struct {
DestroyTimeout time.Duration
// Supervision config to limit worker and pool memory usage.
- Supervisor SupervisorConfig
+ Supervisor *SupervisorConfig
}
// InitDefaults enables default config values.
@@ -103,22 +105,24 @@ func (cfg *Config) InitDefaults() {
if cfg.DestroyTimeout == 0 {
cfg.DestroyTimeout = time.Minute
}
-
+ if cfg.Supervisor == nil {
+ return
+ }
cfg.Supervisor.InitDefaults()
}
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
- WatchTick time.Duration
+ WatchTick uint64
// TTL defines maximum time worker is allowed to live.
- TTL int64
+ TTL uint64
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL int64
+ IdleTTL uint64
// ExecTTL defines maximum lifetime per job.
- ExecTTL time.Duration
+ ExecTTL uint64
// MaxWorkerMemory limits memory per worker.
MaxWorkerMemory uint64
@@ -127,6 +131,6 @@ type SupervisorConfig struct {
// InitDefaults enables default config values.
func (cfg *SupervisorConfig) InitDefaults() {
if cfg.WatchTick == 0 {
- cfg.WatchTick = time.Second
+ cfg.WatchTick = 1
}
}
diff --git a/socket_factory.go b/socket_factory.go
index ed151f2d..6f29db22 100755
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -109,7 +109,7 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm
if err != nil {
err = multierr.Combine(
err,
- w.Kill(context.Background()),
+ w.Kill(),
w.Wait(context.Background()),
)
@@ -158,7 +158,7 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
rl, err := f.findRelay(w)
if err != nil {
errs = append(errs, err.Error())
- err = w.Kill(ctx)
+ err = w.Kill()
if err != nil {
errs = append(errs, err.Error())
}
diff --git a/state.go b/state.go
index 2e36c977..91c29946 100755
--- a/state.go
+++ b/state.go
@@ -43,7 +43,6 @@ const (
StateStopping
StateKilling
- StateKilled
// State of worker, when no need to allocate new one
StateDestroyed
diff --git a/static_pool.go b/static_pool.go
index 6f247d9e..be7ad6e3 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -4,11 +4,9 @@ import (
"context"
"fmt"
"os/exec"
- "sync"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
-
- "github.com/pkg/errors"
)
// StopRequest can be sent by worker to indicate that restart is required.
@@ -29,14 +27,8 @@ type StaticPool struct {
// distributes the events
events *util.EventHandler
- // protects state of worker list, does not affect allocation
- muw sync.RWMutex
-
// manages worker states and TTLs
ww *workerWatcher
-
- // supervises memory and TTL of workers
- // sp *supervisedPool
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -79,9 +71,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con
return nil, err
}
- // todo: implement
- // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor)
- // p.sp.Start()
+ // if supervised config not nil, guess, that pool wanted to be supervised
+ if cfg.Supervisor != nil {
+ sp := newPoolWatcher(p, p.events, p.cfg.Supervisor)
+ // start watcher timer
+ sp.Start()
+ return sp, nil
+ }
return p, nil
}
@@ -106,13 +102,13 @@ func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
}
func (sp *StaticPool) Exec(p Payload) (Payload, error) {
+ const op = errors.Op("Exec")
if sp.cfg.Debug {
return sp.execDebug(p)
}
-
w, err := sp.ww.GetFreeWorker(context.Background())
- if err != nil && errors.Is(err, ErrWatcherStopped) {
- return EmptyPayload, ErrWatcherStopped
+ if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
+ return EmptyPayload, errors.E(op, err)
} else if err != nil {
return EmptyPayload, err
}
@@ -189,76 +185,71 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
return r, err
}
-// Exec one task with given payload and context, returns result or error.
-// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
-// // todo: why TODO passed here?
-// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
-// defer cancel()
-// w, err := p.ww.GetFreeWorker(getWorkerCtx)
-// if err != nil && errors.Is(err, ErrWatcherStopped) {
-// return EmptyPayload, ErrWatcherStopped
-// } else if err != nil {
-// return EmptyPayload, err
-// }
-//
-// sw := w.(SyncWorker)
-//
-// // todo: implement worker destroy
-// //execCtx context.Context
-// //if p.cfg.Supervisor.ExecTTL != 0 {
-// // var cancel2 context.CancelFunc
-// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL)
-// // defer cancel2()
-// //} else {
-// // execCtx = ctx
-// //}
-//
-// rsp, err := sw.Exec(rqs)
-// if err != nil {
-// errJ := p.checkMaxJobs(ctx, w)
-// if errJ != nil {
-// // todo: worker was not destroyed
-// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
-// }
-//
-// // soft job errors are allowed
-// if _, jobError := err.(ExecError); jobError {
-// p.ww.PushWorker(w)
-// return EmptyPayload, err
-// }
-//
-// sw.State().Set(StateInvalid)
-// errS := w.Stop(ctx)
-// if errS != nil {
-// return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
-// }
-//
-// return EmptyPayload, err
-// }
-//
-// // worker want's to be terminated
-// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
-// w.State().Set(StateInvalid)
-// err = w.Stop(ctx)
-// if err != nil {
-// return EmptyPayload, err
-// }
-// return p.ExecWithContext(ctx, rqs)
-// }
-//
-// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
-// err = p.ww.AllocateNew(ctx)
-// if err != nil {
-// return EmptyPayload, err
-// }
-// } else {
-// p.muw.Lock()
-// p.ww.PushWorker(w)
-// p.muw.Unlock()
-// }
-//
-// return rsp, nil
-// }
+func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+ const op = errors.Op("Exec")
+ w, err := sp.ww.GetFreeWorker(context.Background())
+ if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
+ return EmptyPayload, errors.E(op, err)
+ } else if err != nil {
+ return EmptyPayload, err
+ }
+
+ sw := w.(SyncWorker)
+
+ rsp, err := sw.ExecWithContext(ctx, rqs)
+ if err != nil {
+ // soft job errors are allowed
+ if _, jobError := err.(ExecError); jobError {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew(bCtx)
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ }
+
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ }
+ } else {
+ sp.ww.PushWorker(w)
+ }
+
+ return EmptyPayload, err
+ }
+
+ sw.State().Set(StateInvalid)
+ sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
+ errS := w.Stop(bCtx)
+
+ if errS != nil {
+ return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+ }
+
+ return EmptyPayload, err
+ }
+
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ }
+
+ return sp.Exec(rqs)
+ }
+
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew(bCtx)
+ if err != nil {
+ return EmptyPayload, err
+ }
+ } else {
+ sp.ww.PushWorker(w)
+ }
+ return rsp, nil
+}
// Destroy all underlying stack (but let them to complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
diff --git a/static_pool_test.go b/static_pool_test.go
index b75bd0bf..f1e3e4e4 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -235,7 +235,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
})
// killing random worker and expecting pool to replace it
- err = p.Workers()[0].Kill(ctx)
+ err = p.Workers()[0].Kill()
if err != nil {
t.Errorf("error killing the process: error %v", err)
}
diff --git a/supervisor_pool.go b/supervisor_pool.go
index 9d1d2b1e..5dca3c22 100755
--- a/supervisor_pool.go
+++ b/supervisor_pool.go
@@ -2,8 +2,10 @@ package roadrunner
import (
"context"
+ "sync"
"time"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
)
@@ -11,31 +13,101 @@ const MB = 1024 * 1024
type SupervisedPool interface {
Pool
-
- // ExecWithContext provides the ability to execute with time deadline. Attention, worker will be destroyed if context
- // deadline reached.
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+ // Start used to start watching process for all pool workers
+ Start()
}
type supervisedPool struct {
- cfg SupervisorConfig
+ cfg *SupervisorConfig
events *util.EventHandler
pool Pool
stopCh chan struct{}
+ mu *sync.RWMutex
}
-func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool {
- return &supervisedPool{
+func newPoolWatcher(pool Pool, events *util.EventHandler, cfg *SupervisorConfig) SupervisedPool {
+ sp := &supervisedPool{
cfg: cfg,
events: events,
pool: pool,
+ mu: &sync.RWMutex{},
stopCh: make(chan struct{}),
}
+ return sp
+}
+
+type ttlExec struct {
+ err error
+ p Payload
+}
+
+func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+ const op = errors.Op("exec_supervised")
+ if sp.cfg.ExecTTL == 0 {
+ return sp.pool.Exec(rqs)
+ }
+
+ c := make(chan ttlExec, 1)
+ ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL))
+ defer cancel()
+ go func() {
+ res, err := sp.pool.ExecWithContext(ctx, rqs)
+ if err != nil {
+ c <- ttlExec{
+ err: err,
+ p: EmptyPayload,
+ }
+ }
+
+ c <- ttlExec{
+ err: nil,
+ p: res,
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return EmptyPayload, errors.E(op, errors.TimeOut, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return EmptyPayload, res.err
+ }
+
+ return res.p, nil
+ }
+ }
+}
+
+func (sp *supervisedPool) Exec(p Payload) (Payload, error) {
+ return sp.pool.Exec(p)
+}
+
+func (sp *supervisedPool) AddListener(listener util.EventListener) {
+ sp.pool.AddListener(listener)
+}
+
+func (sp *supervisedPool) GetConfig() Config {
+ return sp.pool.GetConfig()
+}
+
+func (sp *supervisedPool) Workers() (workers []WorkerBase) {
+ sp.mu.Lock()
+ defer sp.mu.Unlock()
+ return sp.pool.Workers()
+}
+
+func (sp *supervisedPool) RemoveWorker(ctx context.Context, worker WorkerBase) error {
+ return sp.pool.RemoveWorker(ctx, worker)
+}
+
+func (sp *supervisedPool) Destroy(ctx context.Context) {
+ sp.pool.Destroy(ctx)
}
func (sp *supervisedPool) Start() {
go func() {
- watchTout := time.NewTicker(sp.cfg.WatchTick)
+ watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick))
for {
select {
case <-sp.stopCh:
@@ -43,7 +115,9 @@ func (sp *supervisedPool) Start() {
return
// stop here
case <-watchTout.C:
+ sp.mu.Lock()
sp.control()
+ sp.mu.Unlock()
}
}
}()
@@ -89,7 +163,7 @@ func (sp *supervisedPool) control() {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
return
} else {
- sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
+ sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]})
}
continue
@@ -116,7 +190,7 @@ func (sp *supervisedPool) control() {
res := int64(lu) - now.UnixNano()
// maxWorkerIdle more than diff between now and last used
- if sp.cfg.IdleTTL-res <= 0 {
+ if sp.cfg.IdleTTL-uint64(res) <= 0 {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
diff --git a/supervisor_test.go b/supervisor_test.go
new file mode 100644
index 00000000..34172d7d
--- /dev/null
+++ b/supervisor_test.go
@@ -0,0 +1,150 @@
+package roadrunner
+
+import (
+ "context"
+ "os/exec"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+var cfgSupervised = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 100,
+ ExecTTL: 100,
+ MaxWorkerMemory: 100,
+ },
+}
+
+func TestSupervisedPool_Exec(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "tests/memleak.php", "pipes") },
+ NewPipeFactory(),
+ cfgSupervised,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ stopCh := make(chan struct{})
+ defer p.Destroy(context.Background())
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ workers := p.Workers()
+ if len(workers) > 0 {
+ s, err := WorkerProcessState(workers[0])
+ assert.NoError(t, err)
+ assert.NotNil(t, s)
+ // since this is soft limit, double max memory limit watch
+ if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 {
+ assert.Fail(t, "max memory reached")
+ }
+ }
+ }
+ }
+ }()
+
+ for i := 0; i < 100; i++ {
+ time.Sleep(time.Millisecond * 50)
+ _, err = p.Exec(Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+ assert.NoError(t, err)
+ }
+
+ stopCh <- struct{}{}
+}
+
+func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 100,
+ ExecTTL: 1,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "tests/sleep.php", "pipes") },
+ NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.ExecWithContext(context.Background(), Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Error(t, err)
+ assert.Empty(t, resp)
+
+ time.Sleep(time.Second * 1)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+}
+
+func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 100,
+ ExecTTL: 4,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "tests/sleep.php", "pipes") },
+ NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ time.Sleep(time.Millisecond * 100)
+ resp, err := p.Exec(Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp)
+
+ time.Sleep(time.Second * 1)
+ // should be the same pid
+ assert.Equal(t, pid, p.Workers()[0].Pid())
+}
diff --git a/sync_worker.go b/sync_worker.go
index 2f3eb1e4..d933077b 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -5,9 +5,10 @@ import (
"fmt"
"time"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
+ "go.uber.org/multierr"
- "github.com/pkg/errors"
"github.com/spiral/goridge/v2"
)
@@ -18,7 +19,9 @@ type SyncWorker interface {
WorkerBase
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
- Exec(p Payload) (Payload, error)
+ Exec(rqs Payload) (Payload, error)
+ // ExecWithContext used to handle Exec with TTL
+ ExecWithContext(ctx context.Context, p Payload) (Payload, error)
}
type syncWorker struct {
@@ -60,14 +63,82 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) {
return rsp, nil
}
+type wexec struct {
+ payload Payload
+ err error
+}
+
+// Exec payload without TTL timeout.
+func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, error) {
+ const op = errors.Op("exec_with_context")
+ c := make(chan wexec, 1)
+ go func() {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, errors.Str("payload can not be empty")),
+ }
+ return
+ }
+
+ if tw.w.State().Value() != StateReady {
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())),
+ }
+ return
+ }
+
+ // set last used time
+ tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.w.State().Set(StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ if _, ok := err.(ExecError); !ok {
+ tw.w.State().Set(StateErrored)
+ tw.w.State().RegisterExec()
+ }
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ tw.w.State().Set(StateReady)
+ tw.w.State().RegisterExec()
+
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ err := multierr.Combine(tw.Kill())
+ if err != nil {
+ return EmptyPayload, multierr.Append(err, ctx.Err())
+ }
+ return EmptyPayload, ctx.Err()
+ case res := <-c:
+ if res.err != nil {
+ return EmptyPayload, res.err
+ }
+ return res.payload, nil
+ }
+}
+
func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
+ const op = errors.Op("exec_payload")
// two things; todo: merge
if err := sendControl(tw.w.Relay(), p.Context); err != nil {
- return EmptyPayload, errors.Wrap(err, "header error")
+ return EmptyPayload, errors.E(op, err, "header error")
}
if err := tw.w.Relay().Send(p.Body, 0); err != nil {
- return EmptyPayload, errors.Wrap(err, "sender error")
+ return EmptyPayload, errors.E(op, err, "sender error")
}
var pr goridge.Prefix
@@ -75,7 +146,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
var err error
if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.Wrap(err, "WorkerProcess error")
+ return EmptyPayload, errors.E(op, err, "WorkerProcess error")
}
if !pr.HasFlag(goridge.PayloadControl) {
@@ -88,7 +159,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
// add streaming support :)
if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.Wrap(err, "WorkerProcess error")
+ return EmptyPayload, errors.E(op, err, "WorkerProcess error")
}
return rsp, nil
@@ -126,8 +197,8 @@ func (tw *syncWorker) Stop(ctx context.Context) error {
return tw.w.Stop(ctx)
}
-func (tw *syncWorker) Kill(ctx context.Context) error {
- return tw.w.Kill(ctx)
+func (tw *syncWorker) Kill() error {
+ return tw.w.Kill()
}
func (tw *syncWorker) Relay() goridge.Relay {
diff --git a/tests/memleak.php b/tests/memleak.php
new file mode 100644
index 00000000..b78a76c0
--- /dev/null
+++ b/tests/memleak.php
@@ -0,0 +1,15 @@
+<?php
+
+declare(strict_types=1);
+
+use Spiral\Goridge\StreamRelay;
+use Spiral\RoadRunner\Worker as RoadRunner;
+
+require dirname(__DIR__) . "/vendor_php/autoload.php";
+
+$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
+$mem = '';
+while($rr->receive($ctx)){
+ $mem .= str_repeat(" ", 1024*1024);
+ $rr->send("");
+} \ No newline at end of file
diff --git a/tests/sleep.php b/tests/sleep.php
new file mode 100644
index 00000000..b3ea8235
--- /dev/null
+++ b/tests/sleep.php
@@ -0,0 +1,15 @@
+<?php
+
+declare(strict_types=1);
+
+use Spiral\Goridge\StreamRelay;
+use Spiral\RoadRunner\Worker as RoadRunner;
+
+require dirname(__DIR__) . "/vendor_php/autoload.php";
+
+$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
+
+while($rr->receive($ctx)){
+ sleep(3);
+ $rr->send("");
+} \ No newline at end of file
diff --git a/worker.go b/worker.go
index 2dda51cc..ef532f51 100755
--- a/worker.go
+++ b/worker.go
@@ -74,7 +74,7 @@ type WorkerBase interface {
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
- Kill(ctx context.Context) error
+ Kill() error
// Relay returns attached to worker goridge relay
Relay() goridge.Relay
@@ -280,7 +280,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error {
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
-func (w *WorkerProcess) Kill(ctx context.Context) error {
+func (w *WorkerProcess) Kill() error {
w.state.Set(StateKilling)
w.mu.Lock()
defer w.mu.Unlock()
diff --git a/worker_test.go b/worker_test.go
index d2744345..78738064 100755
--- a/worker_test.go
+++ b/worker_test.go
@@ -47,7 +47,7 @@ func Test_Kill(t *testing.T) {
assert.NotNil(t, w)
assert.Equal(t, StateReady, w.State().Value())
- err = w.Kill(ctx)
+ err = w.Kill()
if err != nil {
t.Errorf("error killing the WorkerProcess: error %v", err)
}
diff --git a/worker_watcher.go b/worker_watcher.go
index 5ae54024..d289750e 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -2,15 +2,14 @@ package roadrunner
import (
"context"
- "errors"
+ "runtime"
"sync"
"time"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
)
-var ErrWatcherStopped = errors.New("watcher stopped")
-
type Stack struct {
workers []WorkerBase
mutex sync.RWMutex
@@ -19,7 +18,7 @@ type Stack struct {
func NewWorkersStack() *Stack {
return &Stack{
- workers: make([]WorkerBase, 0),
+ workers: make([]WorkerBase, 0, runtime.NumCPU()),
}
}
@@ -85,11 +84,7 @@ type WorkerWatcher interface {
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func newWorkerWatcher(
- allocator func(args ...interface{}) (WorkerBase, error),
- numWorkers int64,
- events *util.EventHandler,
-) *workerWatcher {
+func newWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events *util.EventHandler) *workerWatcher {
ww := &workerWatcher{
stack: NewWorkersStack(),
allocator: allocator,
@@ -127,10 +122,11 @@ func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) e
}
func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
+ const op = errors.Op("get_free_worker")
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
- return nil, ErrWatcherStopped
+ return nil, errors.E(op, errors.ErrWatcherStopped)
}
// handle worker remove state
@@ -146,6 +142,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
// no free stack
if w == nil {
+ // TODO allocate timeout
tout := time.NewTicker(time.Second * 180)
defer tout.Stop()
for {
@@ -153,20 +150,20 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
default:
w, stop = ww.stack.Pop()
if stop {
- return nil, ErrWatcherStopped
+ return nil, errors.E(op, errors.ErrWatcherStopped)
}
if w == nil {
continue
}
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
return w, nil
case <-tout.C:
- return nil, errors.New("no free stack")
+ return nil, errors.Str("no free stack")
}
}
}
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
return w, nil
}
@@ -198,10 +195,10 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error
// found in the stack
// remove worker
ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
wb.State().Set(StateInvalid)
- err := wb.Kill(ctx)
+ err := wb.Kill()
if err != nil {
return err
}
@@ -215,14 +212,19 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error
// O(1) operation
func (ww *workerWatcher) PushWorker(w WorkerBase) {
- ww.mutex.Lock()
- ww.actualNumWorkers++
- ww.mutex.Unlock()
+ ww.IncreaseWorkersCount()
ww.stack.Push(w)
}
func (ww *workerWatcher) ReduceWorkersCount() {
- ww.decreaseNumOfActualWorkers()
+ ww.mutex.Lock()
+ ww.actualNumWorkers--
+ ww.mutex.Unlock()
+}
+func (ww *workerWatcher) IncreaseWorkersCount() {
+ ww.mutex.Lock()
+ ww.actualNumWorkers++
+ ww.mutex.Unlock()
}
// Destroy all underlying stack (but let them to complete the task)
@@ -258,9 +260,17 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
}
-// Warning, this is O(n) operation
+// Warning, this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) WorkersList() []WorkerBase {
- return ww.stack.workers
+ ww.stack.mutex.Lock()
+ defer ww.stack.mutex.Unlock()
+ workersCopy := make([]WorkerBase, 0, 1)
+ for _, v := range ww.stack.workers {
+ sw := v.(SyncWorker)
+ workersCopy = append(workersCopy, sw)
+ }
+
+ return workersCopy
}
func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
@@ -284,7 +294,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
// worker in the stack, reallocating
if ww.stack.workers[i].Pid() == pid {
ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
ww.stack.mutex.Unlock()
err = ww.AllocateNew(ctx)
@@ -321,9 +331,3 @@ func (ww *workerWatcher) addToWatch(wb WorkerBase) {
ww.wait(context.Background(), wb)
}()
}
-
-func (ww *workerWatcher) decreaseNumOfActualWorkers() {
- ww.mutex.Lock()
- ww.actualNumWorkers--
- ww.mutex.Unlock()
-}