diff options
author | Valery Piashchynski <[email protected]> | 2020-10-27 16:15:11 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-10-27 16:15:11 +0300 |
commit | 47a570c220a36ae7b770ea594a41637fa31fc8e8 (patch) | |
tree | db4c7e348c6b03c478a907cf89c8baaf68195d73 | |
parent | 105bde0e0c1a7c133d1daa10603ca5ce9a9ade4d (diff) | |
parent | 27392d90f3b208adc5215ce876698650a4188463 (diff) |
Merge pull request #377 from spiral/feature/pool_supervisor
Feature/pool supervisor
-rwxr-xr-x | .github/workflows/ci-build.yml | 2 | ||||
-rwxr-xr-x | composer.lock | 167 | ||||
-rwxr-xr-x | errors/errors.go | 16 | ||||
-rwxr-xr-x | errors/errors_test.go | 177 | ||||
-rwxr-xr-x | errors/go.mod | 3 | ||||
-rwxr-xr-x | errors/marshal.go | 1 | ||||
-rwxr-xr-x | pipe_factory.go | 5 | ||||
-rwxr-xr-x | plugins/factory/tests/plugin_2.go | 4 | ||||
-rwxr-xr-x | pool.go | 18 | ||||
-rwxr-xr-x | socket_factory.go | 4 | ||||
-rwxr-xr-x | state.go | 1 | ||||
-rwxr-xr-x | static_pool.go | 161 | ||||
-rwxr-xr-x | static_pool_test.go | 2 | ||||
-rwxr-xr-x | supervisor_pool.go | 94 | ||||
-rw-r--r-- | supervisor_test.go | 150 | ||||
-rwxr-xr-x | sync_worker.go | 87 | ||||
-rw-r--r-- | tests/memleak.php | 15 | ||||
-rw-r--r-- | tests/sleep.php | 15 | ||||
-rwxr-xr-x | worker.go | 4 | ||||
-rwxr-xr-x | worker_test.go | 2 | ||||
-rwxr-xr-x | worker_watcher.go | 62 |
21 files changed, 593 insertions, 397 deletions
diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml index 585858c1..94549c37 100755 --- a/.github/workflows/ci-build.yml +++ b/.github/workflows/ci-build.yml @@ -21,7 +21,7 @@ jobs: go-version: ${{ matrix.go }} - name: Set up PHP ${{ matrix.php }} - uses: shivammathur/setup-php@v1 + uses: shivammathur/setup-php@v2 with: php-version: ${{ matrix.php }} extensions: dom diff --git a/composer.lock b/composer.lock index 7a1094b1..13e6af6b 100755 --- a/composer.lock +++ b/composer.lock @@ -89,6 +89,14 @@ "psr-17", "psr-7" ], + "support": { + "chat": "https://laminas.dev/chat", + "docs": "https://docs.laminas.dev/laminas-diactoros/", + "forum": "https://discourse.laminas.dev", + "issues": "https://github.com/laminas/laminas-diactoros/issues", + "rss": "https://github.com/laminas/laminas-diactoros/releases.atom", + "source": "https://github.com/laminas/laminas-diactoros" + }, "funding": [ { "url": "https://funding.communitybridge.org/projects/laminas-project", @@ -143,6 +151,12 @@ "laminas", "zf" ], + "support": { + "forum": "https://discourse.laminas.dev/", + "issues": "https://github.com/laminas/laminas-zendframework-bridge/issues", + "rss": "https://github.com/laminas/laminas-zendframework-bridge/releases.atom", + "source": "https://github.com/laminas/laminas-zendframework-bridge" + }, "funding": [ { "url": "https://funding.communitybridge.org/projects/laminas-project", @@ -198,6 +212,10 @@ "container-interop", "psr" ], + "support": { + "issues": "https://github.com/php-fig/container/issues", + "source": "https://github.com/php-fig/container/tree/master" + }, "time": "2017-02-14T16:28:37+00:00" }, { @@ -250,6 +268,9 @@ "request", "response" ], + "support": { + "source": "https://github.com/php-fig/http-factory/tree/master" + }, "time": "2019-04-30T12:38:16+00:00" }, { @@ -300,6 +321,9 @@ "request", "response" ], + "support": { + "source": "https://github.com/php-fig/http-message/tree/master" + }, "time": "2016-08-06T14:39:51+00:00" }, { @@ -345,20 +369,24 @@ } ], "description": "High-performance PHP-to-Golang RPC bridge", + "support": { + "issues": "https://github.com/spiral/goridge-php/issues", + "source": "https://github.com/spiral/goridge-php/tree/v2.4.5" + }, "time": "2020-08-14T14:28:30+00:00" }, { "name": "symfony/console", - "version": "v5.1.6", + "version": "v5.1.7", "source": { "type": "git", "url": "https://github.com/symfony/console.git", - "reference": "04c3a31fe8ea94b42c9e2d1acc93d19782133b00" + "reference": "ae789a8a2ad189ce7e8216942cdb9b77319f5eb8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/console/zipball/04c3a31fe8ea94b42c9e2d1acc93d19782133b00", - "reference": "04c3a31fe8ea94b42c9e2d1acc93d19782133b00", + "url": "https://api.github.com/repos/symfony/console/zipball/ae789a8a2ad189ce7e8216942cdb9b77319f5eb8", + "reference": "ae789a8a2ad189ce7e8216942cdb9b77319f5eb8", "shasum": "" }, "require": { @@ -424,6 +452,9 @@ ], "description": "Symfony Console Component", "homepage": "https://symfony.com", + "support": { + "source": "https://github.com/symfony/console/tree/v5.1.7" + }, "funding": [ { "url": "https://symfony.com/sponsor", @@ -438,24 +469,24 @@ "type": "tidelift" } ], - "time": "2020-09-18T14:27:32+00:00" + "time": "2020-10-07T15:23:00+00:00" }, { "name": "symfony/polyfill-ctype", - "version": "v1.18.1", + "version": "v1.20.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-ctype.git", - "reference": "1c302646f6efc070cd46856e600e5e0684d6b454" + "reference": "f4ba089a5b6366e453971d3aad5fe8e897b37f41" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/1c302646f6efc070cd46856e600e5e0684d6b454", - "reference": "1c302646f6efc070cd46856e600e5e0684d6b454", + "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/f4ba089a5b6366e453971d3aad5fe8e897b37f41", + "reference": "f4ba089a5b6366e453971d3aad5fe8e897b37f41", "shasum": "" }, "require": { - "php": ">=5.3.3" + "php": ">=7.1" }, "suggest": { "ext-ctype": "For best performance" @@ -463,7 +494,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "1.18-dev" + "dev-main": "1.20-dev" }, "thanks": { "name": "symfony/polyfill", @@ -500,6 +531,9 @@ "polyfill", "portable" ], + "support": { + "source": "https://github.com/symfony/polyfill-ctype/tree/v1.20.0" + }, "funding": [ { "url": "https://symfony.com/sponsor", @@ -514,24 +548,24 @@ "type": "tidelift" } ], - "time": "2020-07-14T12:35:20+00:00" + "time": "2020-10-23T14:02:19+00:00" }, { "name": "symfony/polyfill-intl-grapheme", - "version": "v1.18.1", + "version": "v1.20.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-intl-grapheme.git", - "reference": "b740103edbdcc39602239ee8860f0f45a8eb9aa5" + "reference": "c7cf3f858ec7d70b89559d6e6eb1f7c2517d479c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-intl-grapheme/zipball/b740103edbdcc39602239ee8860f0f45a8eb9aa5", - "reference": "b740103edbdcc39602239ee8860f0f45a8eb9aa5", + "url": "https://api.github.com/repos/symfony/polyfill-intl-grapheme/zipball/c7cf3f858ec7d70b89559d6e6eb1f7c2517d479c", + "reference": "c7cf3f858ec7d70b89559d6e6eb1f7c2517d479c", "shasum": "" }, "require": { - "php": ">=5.3.3" + "php": ">=7.1" }, "suggest": { "ext-intl": "For best performance" @@ -539,7 +573,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "1.18-dev" + "dev-main": "1.20-dev" }, "thanks": { "name": "symfony/polyfill", @@ -578,6 +612,9 @@ "portable", "shim" ], + "support": { + "source": "https://github.com/symfony/polyfill-intl-grapheme/tree/v1.20.0" + }, "funding": [ { "url": "https://symfony.com/sponsor", @@ -592,24 +629,24 @@ "type": "tidelift" } ], - "time": "2020-07-14T12:35:20+00:00" + "time": "2020-10-23T14:02:19+00:00" }, { "name": "symfony/polyfill-intl-normalizer", - "version": "v1.18.1", + "version": "v1.20.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-intl-normalizer.git", - "reference": "37078a8dd4a2a1e9ab0231af7c6cb671b2ed5a7e" + "reference": "727d1096295d807c309fb01a851577302394c897" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-intl-normalizer/zipball/37078a8dd4a2a1e9ab0231af7c6cb671b2ed5a7e", - "reference": "37078a8dd4a2a1e9ab0231af7c6cb671b2ed5a7e", + "url": "https://api.github.com/repos/symfony/polyfill-intl-normalizer/zipball/727d1096295d807c309fb01a851577302394c897", + "reference": "727d1096295d807c309fb01a851577302394c897", "shasum": "" }, "require": { - "php": ">=5.3.3" + "php": ">=7.1" }, "suggest": { "ext-intl": "For best performance" @@ -617,7 +654,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "1.18-dev" + "dev-main": "1.20-dev" }, "thanks": { "name": "symfony/polyfill", @@ -659,6 +696,9 @@ "portable", "shim" ], + "support": { + "source": "https://github.com/symfony/polyfill-intl-normalizer/tree/v1.20.0" + }, "funding": [ { "url": "https://symfony.com/sponsor", @@ -673,24 +713,24 @@ "type": "tidelift" } ], - "time": "2020-07-14T12:35:20+00:00" + "time": "2020-10-23T14:02:19+00:00" }, { "name": "symfony/polyfill-mbstring", - "version": "v1.18.1", + "version": "v1.20.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-mbstring.git", - "reference": "a6977d63bf9a0ad4c65cd352709e230876f9904a" + "reference": "39d483bdf39be819deabf04ec872eb0b2410b531" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/a6977d63bf9a0ad4c65cd352709e230876f9904a", - "reference": "a6977d63bf9a0ad4c65cd352709e230876f9904a", + "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/39d483bdf39be819deabf04ec872eb0b2410b531", + "reference": "39d483bdf39be819deabf04ec872eb0b2410b531", "shasum": "" }, "require": { - "php": ">=5.3.3" + "php": ">=7.1" }, "suggest": { "ext-mbstring": "For best performance" @@ -698,7 +738,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "1.18-dev" + "dev-main": "1.20-dev" }, "thanks": { "name": "symfony/polyfill", @@ -736,6 +776,9 @@ "portable", "shim" ], + "support": { + "source": "https://github.com/symfony/polyfill-mbstring/tree/v1.20.0" + }, "funding": [ { "url": "https://symfony.com/sponsor", @@ -750,29 +793,29 @@ "type": "tidelift" } ], - "time": "2020-07-14T12:35:20+00:00" + "time": "2020-10-23T14:02:19+00:00" }, { "name": "symfony/polyfill-php73", - "version": "v1.18.1", + "version": "v1.20.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-php73.git", - "reference": "fffa1a52a023e782cdcc221d781fe1ec8f87fcca" + "reference": "8ff431c517be11c78c48a39a66d37431e26a6bed" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-php73/zipball/fffa1a52a023e782cdcc221d781fe1ec8f87fcca", - "reference": "fffa1a52a023e782cdcc221d781fe1ec8f87fcca", + "url": "https://api.github.com/repos/symfony/polyfill-php73/zipball/8ff431c517be11c78c48a39a66d37431e26a6bed", + "reference": "8ff431c517be11c78c48a39a66d37431e26a6bed", "shasum": "" }, "require": { - "php": ">=5.3.3" + "php": ">=7.1" }, "type": "library", "extra": { "branch-alias": { - "dev-master": "1.18-dev" + "dev-main": "1.20-dev" }, "thanks": { "name": "symfony/polyfill", @@ -812,6 +855,9 @@ "portable", "shim" ], + "support": { + "source": "https://github.com/symfony/polyfill-php73/tree/v1.20.0" + }, "funding": [ { "url": "https://symfony.com/sponsor", @@ -826,29 +872,29 @@ "type": "tidelift" } ], - "time": "2020-07-14T12:35:20+00:00" + "time": "2020-10-23T14:02:19+00:00" }, { "name": "symfony/polyfill-php80", - "version": "v1.18.1", + "version": "v1.20.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-php80.git", - "reference": "d87d5766cbf48d72388a9f6b85f280c8ad51f981" + "reference": "e70aa8b064c5b72d3df2abd5ab1e90464ad009de" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-php80/zipball/d87d5766cbf48d72388a9f6b85f280c8ad51f981", - "reference": "d87d5766cbf48d72388a9f6b85f280c8ad51f981", + "url": "https://api.github.com/repos/symfony/polyfill-php80/zipball/e70aa8b064c5b72d3df2abd5ab1e90464ad009de", + "reference": "e70aa8b064c5b72d3df2abd5ab1e90464ad009de", "shasum": "" }, "require": { - "php": ">=7.0.8" + "php": ">=7.1" }, "type": "library", "extra": { "branch-alias": { - "dev-master": "1.18-dev" + "dev-main": "1.20-dev" }, "thanks": { "name": "symfony/polyfill", @@ -892,6 +938,9 @@ "portable", "shim" ], + "support": { + "source": "https://github.com/symfony/polyfill-php80/tree/v1.20.0" + }, "funding": [ { "url": "https://symfony.com/sponsor", @@ -906,7 +955,7 @@ "type": "tidelift" } ], - "time": "2020-07-14T12:35:20+00:00" + "time": "2020-10-23T14:02:19+00:00" }, { "name": "symfony/service-contracts", @@ -968,6 +1017,9 @@ "interoperability", "standards" ], + "support": { + "source": "https://github.com/symfony/service-contracts/tree/master" + }, "funding": [ { "url": "https://symfony.com/sponsor", @@ -986,7 +1038,7 @@ }, { "name": "symfony/string", - "version": "v5.1.6", + "version": "v5.1.7", "source": { "type": "git", "url": "https://github.com/symfony/string.git", @@ -1053,6 +1105,9 @@ "utf-8", "utf8" ], + "support": { + "source": "https://github.com/symfony/string/tree/v5.1.7" + }, "funding": [ { "url": "https://symfony.com/sponsor", @@ -1073,16 +1128,16 @@ "packages-dev": [ { "name": "phpstan/phpstan", - "version": "0.12.46", + "version": "0.12.52", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "9419738e20f0c49757be05d22969c1c44c1dff3b" + "reference": "e96dd5e7ae9aefed663bc7e285ad96792b67eadc" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/9419738e20f0c49757be05d22969c1c44c1dff3b", - "reference": "9419738e20f0c49757be05d22969c1c44c1dff3b", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/e96dd5e7ae9aefed663bc7e285ad96792b67eadc", + "reference": "e96dd5e7ae9aefed663bc7e285ad96792b67eadc", "shasum": "" }, "require": { @@ -1111,6 +1166,10 @@ "MIT" ], "description": "PHPStan - PHP Static Analysis Tool", + "support": { + "issues": "https://github.com/phpstan/phpstan/issues", + "source": "https://github.com/phpstan/phpstan/tree/0.12.52" + }, "funding": [ { "url": "https://github.com/ondrejmirtes", @@ -1125,7 +1184,7 @@ "type": "tidelift" } ], - "time": "2020-09-28T09:48:55+00:00" + "time": "2020-10-25T07:23:44+00:00" } ], "aliases": [], @@ -1139,5 +1198,5 @@ "ext-curl": "*" }, "platform-dev": [], - "plugin-api-version": "1.1.0" + "plugin-api-version": "2.0.0" } diff --git a/errors/errors.go b/errors/errors.go index def408d8..ec621b67 100755 --- a/errors/errors.go +++ b/errors/errors.go @@ -39,22 +39,18 @@ type Kind uint8 // Kinds of errors. const ( Undefined Kind = iota // Undefined error. - Network - Other - Test + ErrWatcherStopped + TimeOut ) func (k Kind) String() string { switch k { case Undefined: return "UNDEF" - case Network: - return "Network error" - case Other: - return "Other" - case Test: - return "Test" - + case ErrWatcherStopped: + return "Watcher stopped" + case TimeOut: + return "TimedOut" } return "unknown error kind" } diff --git a/errors/errors_test.go b/errors/errors_test.go deleted file mode 100755 index 50d3d422..00000000 --- a/errors/errors_test.go +++ /dev/null @@ -1,177 +0,0 @@ -// +build !debug - -package errors - -import ( - "fmt" - "io" - "os" - "os/exec" - "testing" -) - -func TestDebug(t *testing.T) { - // Test with -tags debug to run the tests in debug_test.go - cmd := exec.Command("go", "test", "-tags", "prod") - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { - t.Fatalf("external go test failed: %v", err) - } -} - -func TestMarshal(t *testing.T) { - // Single error. No user is set, so we will have a zero-length field inside. - e1 := E(Op("Get"), Network, "caching in progress") - - // Nested error. - e2 := E(Op("Read"), Undefined, e1) - - b := MarshalError(e2) - e3 := UnmarshalError(b) - - in := e2.(*Error) - out := e3.(*Error) - - // Compare elementwise. - if in.Op != out.Op { - t.Errorf("expected Op %q; got %q", in.Op, out.Op) - } - if in.Kind != out.Kind { - t.Errorf("expected kind %d; got %d", in.Kind, out.Kind) - } - // Note that error will have lost type information, so just check its Error string. - if in.Err.Error() != out.Err.Error() { - t.Errorf("expected Err %q; got %q", in.Err, out.Err) - } -} - -func TestSeparator(t *testing.T) { - defer func(prev string) { - Separator = prev - }(Separator) - Separator = ":: " - - // Single error. No user is set, so we will have a zero-length field inside. - e1 := E(Op("Get"), Network, "network error") - - // Nested error. - e2 := E(Op("Get"), Network, e1) - - want := "Get: Network error:: Get: network error" - if errorAsString(e2) != want { - t.Errorf("expected %q; got %q", want, e2) - } -} - -func TestDoesNotChangePreviousError(t *testing.T) { - err := E(Network) - err2 := E(Op("I will NOT modify err"), err) - - expected := "I will NOT modify err: Network error" - if errorAsString(err2) != expected { - t.Fatalf("Expected %q, got %q", expected, err2) - } - kind := err.(*Error).Kind - if kind != Network { - t.Fatalf("Expected kind %v, got %v", Network, kind) - } -} - -//func TestNoArgs(t *testing.T) { -// defer func() { -// err := recover() -// if err == nil { -// t.Fatal("E() did not panic") -// } -// }() -// _ = E() -//} - -type matchTest struct { - err1, err2 error - matched bool -} - -const ( - op = Op("Op") - op1 = Op("Op1") - op2 = Op("Op2") -) - -var matchTests = []matchTest{ - // Errors not of type *Error fail outright. - {nil, nil, false}, - {io.EOF, io.EOF, false}, - {E(io.EOF), io.EOF, false}, - {io.EOF, E(io.EOF), false}, - // Success. We can drop fields from the first argument and still match. - {E(io.EOF), E(io.EOF), true}, - {E(op, Other, io.EOF), E(op, Other, io.EOF), true}, - {E(op, Other, io.EOF, "test"), E(op, Other, io.EOF, "test", "test"), true}, - {E(op, Other), E(op, Other, io.EOF, "test", "test"), true}, - {E(op), E(op, Other, io.EOF, "test", "test"), true}, - // Failure. - {E(io.EOF), E(io.ErrClosedPipe), false}, - {E(op1), E(op2), false}, - {E(Other), E(Network), false}, - {E("test"), E("test1"), false}, - {E(fmt.Errorf("error")), E(fmt.Errorf("error1")), false}, - {E(op, Other, io.EOF, "test", "test1"), E(op, Other, io.EOF, "test", "test"), false}, - {E("test", Str("something")), E("test"), false}, // Test nil error on rhs. - // Nested *Errors. - {E(op1, E("test")), E(op1, "1", E(op2, "2", "test")), true}, - {E(op1, "test"), E(op1, "1", E(op2, "2", "test")), false}, - {E(op1, E("test")), E(op1, "1", Str(E(op2, "2", "test").Error())), false}, -} - -func TestMatch(t *testing.T) { - for _, test := range matchTests { - matched := Match(test.err1, test.err2) - if matched != test.matched { - t.Errorf("Match(%q, %q)=%t; want %t", test.err1, test.err2, matched, test.matched) - } - } -} - -type kindTest struct { - err error - kind Kind - want bool -} - -var kindTests = []kindTest{ - //Non-Error errors. - {nil, Network, false}, - {Str("not an *Error"), Network, false}, - - // Basic comparisons. - {E(Network), Network, true}, - {E(Test), Network, false}, - {E("no kind"), Network, false}, - {E("no kind"), Other, false}, - - // Nested *Error values. - {E("Nesting", E(Network)), Network, true}, - {E("Nesting", E(Test)), Network, false}, - {E("Nesting", E("no kind")), Network, false}, - {E("Nesting", E("no kind")), Other, false}, -} - -func TestKind(t *testing.T) { - for _, test := range kindTests { - got := Is(test.kind, test.err) - if got != test.want { - t.Errorf("Is(%q, %q)=%t; want %t", test.kind, test.err, got, test.want) - } - } -} - -func errorAsString(err error) string { - if e, ok := err.(*Error); ok { - e2 := *e - e2.stack = stack{} - return e2.Error() - } - return err.Error() -} diff --git a/errors/go.mod b/errors/go.mod deleted file mode 100755 index 1eaacc23..00000000 --- a/errors/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module github.com/48d90782/errors - -go 1.15 diff --git a/errors/marshal.go b/errors/marshal.go index a13ec01f..7c8a63ef 100755 --- a/errors/marshal.go +++ b/errors/marshal.go @@ -35,7 +35,6 @@ func MarshalErrorAppend(err error, b []byte) []byte { b = append(b, 'e') b = appendString(b, err.Error()) return b - } func MarshalError(err error) []byte { diff --git a/pipe_factory.go b/pipe_factory.go index a6c94614..807d7793 100755 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -84,7 +84,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) } // todo kill timeout - errK := w.Kill(ctx) + errK := w.Kill() if errK != nil { errs = append(errs, fmt.Errorf("error killing the worker with PID number %d, Created: %s", w.Pid(), w.Created()).Error()) } @@ -164,8 +164,7 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { errs = append(errs, errF.Error()) } - // todo kill timeout ?? - errK := w.Kill(context.Background()) + errK := w.Kill() if errK != nil { errs = append(errs, fmt.Errorf("error killing the worker with PID number %d, Created: %s", w.Pid(), w.Created()).Error()) } diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go index 9f401bec..2311b7bf 100755 --- a/plugins/factory/tests/plugin_2.go +++ b/plugins/factory/tests/plugin_2.go @@ -61,11 +61,11 @@ func (f *Foo2) Serve() chan error { MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: roadrunner.SupervisorConfig{ + Supervisor: &roadrunner.SupervisorConfig{ WatchTick: 60, TTL: 1000, IdleTTL: 10, - ExecTTL: time.Second * 10, + ExecTTL: 10, MaxWorkerMemory: 1000, }, } @@ -54,6 +54,8 @@ type Pool interface { // Exec Exec(rqs Payload) (Payload, error) + ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) + // Workers returns worker list associated with the pool. Workers() (workers []WorkerBase) @@ -87,7 +89,7 @@ type Config struct { DestroyTimeout time.Duration // Supervision config to limit worker and pool memory usage. - Supervisor SupervisorConfig + Supervisor *SupervisorConfig } // InitDefaults enables default config values. @@ -103,22 +105,24 @@ func (cfg *Config) InitDefaults() { if cfg.DestroyTimeout == 0 { cfg.DestroyTimeout = time.Minute } - + if cfg.Supervisor == nil { + return + } cfg.Supervisor.InitDefaults() } type SupervisorConfig struct { // WatchTick defines how often to check the state of worker. - WatchTick time.Duration + WatchTick uint64 // TTL defines maximum time worker is allowed to live. - TTL int64 + TTL uint64 // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL int64 + IdleTTL uint64 // ExecTTL defines maximum lifetime per job. - ExecTTL time.Duration + ExecTTL uint64 // MaxWorkerMemory limits memory per worker. MaxWorkerMemory uint64 @@ -127,6 +131,6 @@ type SupervisorConfig struct { // InitDefaults enables default config values. func (cfg *SupervisorConfig) InitDefaults() { if cfg.WatchTick == 0 { - cfg.WatchTick = time.Second + cfg.WatchTick = 1 } } diff --git a/socket_factory.go b/socket_factory.go index ed151f2d..6f29db22 100755 --- a/socket_factory.go +++ b/socket_factory.go @@ -109,7 +109,7 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm if err != nil { err = multierr.Combine( err, - w.Kill(context.Background()), + w.Kill(), w.Wait(context.Background()), ) @@ -158,7 +158,7 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { rl, err := f.findRelay(w) if err != nil { errs = append(errs, err.Error()) - err = w.Kill(ctx) + err = w.Kill() if err != nil { errs = append(errs, err.Error()) } @@ -43,7 +43,6 @@ const ( StateStopping StateKilling - StateKilled // State of worker, when no need to allocate new one StateDestroyed diff --git a/static_pool.go b/static_pool.go index 6f247d9e..be7ad6e3 100755 --- a/static_pool.go +++ b/static_pool.go @@ -4,11 +4,9 @@ import ( "context" "fmt" "os/exec" - "sync" + "github.com/spiral/roadrunner/v2/errors" "github.com/spiral/roadrunner/v2/util" - - "github.com/pkg/errors" ) // StopRequest can be sent by worker to indicate that restart is required. @@ -29,14 +27,8 @@ type StaticPool struct { // distributes the events events *util.EventHandler - // protects state of worker list, does not affect allocation - muw sync.RWMutex - // manages worker states and TTLs ww *workerWatcher - - // supervises memory and TTL of workers - // sp *supervisedPool } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -79,9 +71,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con return nil, err } - // todo: implement - // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor) - // p.sp.Start() + // if supervised config not nil, guess, that pool wanted to be supervised + if cfg.Supervisor != nil { + sp := newPoolWatcher(p, p.events, p.cfg.Supervisor) + // start watcher timer + sp.Start() + return sp, nil + } return p, nil } @@ -106,13 +102,13 @@ func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { } func (sp *StaticPool) Exec(p Payload) (Payload, error) { + const op = errors.Op("Exec") if sp.cfg.Debug { return sp.execDebug(p) } - w, err := sp.ww.GetFreeWorker(context.Background()) - if err != nil && errors.Is(err, ErrWatcherStopped) { - return EmptyPayload, ErrWatcherStopped + if err != nil && errors.Is(errors.ErrWatcherStopped, err) { + return EmptyPayload, errors.E(op, err) } else if err != nil { return EmptyPayload, err } @@ -189,76 +185,71 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) { return r, err } -// Exec one task with given payload and context, returns result or error. -// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { -// // todo: why TODO passed here? -// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) -// defer cancel() -// w, err := p.ww.GetFreeWorker(getWorkerCtx) -// if err != nil && errors.Is(err, ErrWatcherStopped) { -// return EmptyPayload, ErrWatcherStopped -// } else if err != nil { -// return EmptyPayload, err -// } -// -// sw := w.(SyncWorker) -// -// // todo: implement worker destroy -// //execCtx context.Context -// //if p.cfg.Supervisor.ExecTTL != 0 { -// // var cancel2 context.CancelFunc -// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL) -// // defer cancel2() -// //} else { -// // execCtx = ctx -// //} -// -// rsp, err := sw.Exec(rqs) -// if err != nil { -// errJ := p.checkMaxJobs(ctx, w) -// if errJ != nil { -// // todo: worker was not destroyed -// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) -// } -// -// // soft job errors are allowed -// if _, jobError := err.(ExecError); jobError { -// p.ww.PushWorker(w) -// return EmptyPayload, err -// } -// -// sw.State().Set(StateInvalid) -// errS := w.Stop(ctx) -// if errS != nil { -// return EmptyPayload, fmt.Errorf("%v, %v", err, errS) -// } -// -// return EmptyPayload, err -// } -// -// // worker want's to be terminated -// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { -// w.State().Set(StateInvalid) -// err = w.Stop(ctx) -// if err != nil { -// return EmptyPayload, err -// } -// return p.ExecWithContext(ctx, rqs) -// } -// -// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { -// err = p.ww.AllocateNew(ctx) -// if err != nil { -// return EmptyPayload, err -// } -// } else { -// p.muw.Lock() -// p.ww.PushWorker(w) -// p.muw.Unlock() -// } -// -// return rsp, nil -// } +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { + const op = errors.Op("Exec") + w, err := sp.ww.GetFreeWorker(context.Background()) + if err != nil && errors.Is(errors.ErrWatcherStopped, err) { + return EmptyPayload, errors.E(op, err) + } else if err != nil { + return EmptyPayload, err + } + + sw := w.(SyncWorker) + + rsp, err := sw.ExecWithContext(ctx, rqs) + if err != nil { + // soft job errors are allowed + if _, jobError := err.(ExecError); jobError { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew(bCtx) + if err != nil { + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + } + + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + } + } else { + sp.ww.PushWorker(w) + } + + return EmptyPayload, err + } + + sw.State().Set(StateInvalid) + sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) + errS := w.Stop(bCtx) + + if errS != nil { + return EmptyPayload, fmt.Errorf("%v, %v", err, errS) + } + + return EmptyPayload, err + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + } + + return sp.Exec(rqs) + } + + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew(bCtx) + if err != nil { + return EmptyPayload, err + } + } else { + sp.ww.PushWorker(w) + } + return rsp, nil +} // Destroy all underlying stack (but let them to complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { diff --git a/static_pool_test.go b/static_pool_test.go index b75bd0bf..f1e3e4e4 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -235,7 +235,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { }) // killing random worker and expecting pool to replace it - err = p.Workers()[0].Kill(ctx) + err = p.Workers()[0].Kill() if err != nil { t.Errorf("error killing the process: error %v", err) } diff --git a/supervisor_pool.go b/supervisor_pool.go index 9d1d2b1e..5dca3c22 100755 --- a/supervisor_pool.go +++ b/supervisor_pool.go @@ -2,8 +2,10 @@ package roadrunner import ( "context" + "sync" "time" + "github.com/spiral/roadrunner/v2/errors" "github.com/spiral/roadrunner/v2/util" ) @@ -11,31 +13,101 @@ const MB = 1024 * 1024 type SupervisedPool interface { Pool - - // ExecWithContext provides the ability to execute with time deadline. Attention, worker will be destroyed if context - // deadline reached. - ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) + // Start used to start watching process for all pool workers + Start() } type supervisedPool struct { - cfg SupervisorConfig + cfg *SupervisorConfig events *util.EventHandler pool Pool stopCh chan struct{} + mu *sync.RWMutex } -func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool { - return &supervisedPool{ +func newPoolWatcher(pool Pool, events *util.EventHandler, cfg *SupervisorConfig) SupervisedPool { + sp := &supervisedPool{ cfg: cfg, events: events, pool: pool, + mu: &sync.RWMutex{}, stopCh: make(chan struct{}), } + return sp +} + +type ttlExec struct { + err error + p Payload +} + +func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { + const op = errors.Op("exec_supervised") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + c := make(chan ttlExec, 1) + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL)) + defer cancel() + go func() { + res, err := sp.pool.ExecWithContext(ctx, rqs) + if err != nil { + c <- ttlExec{ + err: err, + p: EmptyPayload, + } + } + + c <- ttlExec{ + err: nil, + p: res, + } + }() + + for { + select { + case <-ctx.Done(): + return EmptyPayload, errors.E(op, errors.TimeOut, ctx.Err()) + case res := <-c: + if res.err != nil { + return EmptyPayload, res.err + } + + return res.p, nil + } + } +} + +func (sp *supervisedPool) Exec(p Payload) (Payload, error) { + return sp.pool.Exec(p) +} + +func (sp *supervisedPool) AddListener(listener util.EventListener) { + sp.pool.AddListener(listener) +} + +func (sp *supervisedPool) GetConfig() Config { + return sp.pool.GetConfig() +} + +func (sp *supervisedPool) Workers() (workers []WorkerBase) { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Workers() +} + +func (sp *supervisedPool) RemoveWorker(ctx context.Context, worker WorkerBase) error { + return sp.pool.RemoveWorker(ctx, worker) +} + +func (sp *supervisedPool) Destroy(ctx context.Context) { + sp.pool.Destroy(ctx) } func (sp *supervisedPool) Start() { go func() { - watchTout := time.NewTicker(sp.cfg.WatchTick) + watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick)) for { select { case <-sp.stopCh: @@ -43,7 +115,9 @@ func (sp *supervisedPool) Start() { return // stop here case <-watchTout.C: + sp.mu.Lock() sp.control() + sp.mu.Unlock() } } }() @@ -89,7 +163,7 @@ func (sp *supervisedPool) control() { sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) return } else { - sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) + sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]}) } continue @@ -116,7 +190,7 @@ func (sp *supervisedPool) control() { res := int64(lu) - now.UnixNano() // maxWorkerIdle more than diff between now and last used - if sp.cfg.IdleTTL-res <= 0 { + if sp.cfg.IdleTTL-uint64(res) <= 0 { err = sp.pool.RemoveWorker(ctx, workers[i]) if err != nil { sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) diff --git a/supervisor_test.go b/supervisor_test.go new file mode 100644 index 00000000..34172d7d --- /dev/null +++ b/supervisor_test.go @@ -0,0 +1,150 @@ +package roadrunner + +import ( + "context" + "os/exec" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var cfgSupervised = Config{ + NumWorkers: int64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1, + TTL: 100, + IdleTTL: 100, + ExecTTL: 100, + MaxWorkerMemory: 100, + }, +} + +func TestSupervisedPool_Exec(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "tests/memleak.php", "pipes") }, + NewPipeFactory(), + cfgSupervised, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + stopCh := make(chan struct{}) + defer p.Destroy(context.Background()) + + go func() { + for { + select { + case <-stopCh: + return + default: + workers := p.Workers() + if len(workers) > 0 { + s, err := WorkerProcessState(workers[0]) + assert.NoError(t, err) + assert.NotNil(t, s) + // since this is soft limit, double max memory limit watch + if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 { + assert.Fail(t, "max memory reached") + } + } + } + } + }() + + for i := 0; i < 100; i++ { + time.Sleep(time.Millisecond * 50) + _, err = p.Exec(Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) + } + + stopCh <- struct{}{} +} + +func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: int64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1, + TTL: 100, + IdleTTL: 100, + ExecTTL: 1, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "tests/sleep.php", "pipes") }, + NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + resp, err := p.ExecWithContext(context.Background(), Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Error(t, err) + assert.Empty(t, resp) + + time.Sleep(time.Second * 1) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) +} + +func TestSupervisedPool_ExecTTL_OK(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: int64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1, + TTL: 100, + IdleTTL: 100, + ExecTTL: 4, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "tests/sleep.php", "pipes") }, + NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp) + + time.Sleep(time.Second * 1) + // should be the same pid + assert.Equal(t, pid, p.Workers()[0].Pid()) +} diff --git a/sync_worker.go b/sync_worker.go index 2f3eb1e4..d933077b 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -5,9 +5,10 @@ import ( "fmt" "time" + "github.com/spiral/roadrunner/v2/errors" "github.com/spiral/roadrunner/v2/util" + "go.uber.org/multierr" - "github.com/pkg/errors" "github.com/spiral/goridge/v2" ) @@ -18,7 +19,9 @@ type SyncWorker interface { WorkerBase // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(p Payload) (Payload, error) + Exec(rqs Payload) (Payload, error) + // ExecWithContext used to handle Exec with TTL + ExecWithContext(ctx context.Context, p Payload) (Payload, error) } type syncWorker struct { @@ -60,14 +63,82 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) { return rsp, nil } +type wexec struct { + payload Payload + err error +} + +// Exec payload without TTL timeout. +func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, error) { + const op = errors.Op("exec_with_context") + c := make(chan wexec, 1) + go func() { + if len(p.Body) == 0 && len(p.Context) == 0 { + c <- wexec{ + payload: EmptyPayload, + err: errors.E(op, errors.Str("payload can not be empty")), + } + return + } + + if tw.w.State().Value() != StateReady { + c <- wexec{ + payload: EmptyPayload, + err: errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())), + } + return + } + + // set last used time + tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.w.State().Set(StateWorking) + + rsp, err := tw.execPayload(p) + if err != nil { + if _, ok := err.(ExecError); !ok { + tw.w.State().Set(StateErrored) + tw.w.State().RegisterExec() + } + c <- wexec{ + payload: EmptyPayload, + err: errors.E(op, err), + } + return + } + + tw.w.State().Set(StateReady) + tw.w.State().RegisterExec() + + c <- wexec{ + payload: rsp, + err: nil, + } + }() + + select { + case <-ctx.Done(): + err := multierr.Combine(tw.Kill()) + if err != nil { + return EmptyPayload, multierr.Append(err, ctx.Err()) + } + return EmptyPayload, ctx.Err() + case res := <-c: + if res.err != nil { + return EmptyPayload, res.err + } + return res.payload, nil + } +} + func (tw *syncWorker) execPayload(p Payload) (Payload, error) { + const op = errors.Op("exec_payload") // two things; todo: merge if err := sendControl(tw.w.Relay(), p.Context); err != nil { - return EmptyPayload, errors.Wrap(err, "header error") + return EmptyPayload, errors.E(op, err, "header error") } if err := tw.w.Relay().Send(p.Body, 0); err != nil { - return EmptyPayload, errors.Wrap(err, "sender error") + return EmptyPayload, errors.E(op, err, "sender error") } var pr goridge.Prefix @@ -75,7 +146,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) { var err error if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil { - return EmptyPayload, errors.Wrap(err, "WorkerProcess error") + return EmptyPayload, errors.E(op, err, "WorkerProcess error") } if !pr.HasFlag(goridge.PayloadControl) { @@ -88,7 +159,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) { // add streaming support :) if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil { - return EmptyPayload, errors.Wrap(err, "WorkerProcess error") + return EmptyPayload, errors.E(op, err, "WorkerProcess error") } return rsp, nil @@ -126,8 +197,8 @@ func (tw *syncWorker) Stop(ctx context.Context) error { return tw.w.Stop(ctx) } -func (tw *syncWorker) Kill(ctx context.Context) error { - return tw.w.Kill(ctx) +func (tw *syncWorker) Kill() error { + return tw.w.Kill() } func (tw *syncWorker) Relay() goridge.Relay { diff --git a/tests/memleak.php b/tests/memleak.php new file mode 100644 index 00000000..b78a76c0 --- /dev/null +++ b/tests/memleak.php @@ -0,0 +1,15 @@ +<?php + +declare(strict_types=1); + +use Spiral\Goridge\StreamRelay; +use Spiral\RoadRunner\Worker as RoadRunner; + +require dirname(__DIR__) . "/vendor_php/autoload.php"; + +$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); +$mem = ''; +while($rr->receive($ctx)){ + $mem .= str_repeat(" ", 1024*1024); + $rr->send(""); +}
\ No newline at end of file diff --git a/tests/sleep.php b/tests/sleep.php new file mode 100644 index 00000000..b3ea8235 --- /dev/null +++ b/tests/sleep.php @@ -0,0 +1,15 @@ +<?php + +declare(strict_types=1); + +use Spiral\Goridge\StreamRelay; +use Spiral\RoadRunner\Worker as RoadRunner; + +require dirname(__DIR__) . "/vendor_php/autoload.php"; + +$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); + +while($rr->receive($ctx)){ + sleep(3); + $rr->send(""); +}
\ No newline at end of file @@ -74,7 +74,7 @@ type WorkerBase interface { // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! - Kill(ctx context.Context) error + Kill() error // Relay returns attached to worker goridge relay Relay() goridge.Relay @@ -280,7 +280,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error { // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! -func (w *WorkerProcess) Kill(ctx context.Context) error { +func (w *WorkerProcess) Kill() error { w.state.Set(StateKilling) w.mu.Lock() defer w.mu.Unlock() diff --git a/worker_test.go b/worker_test.go index d2744345..78738064 100755 --- a/worker_test.go +++ b/worker_test.go @@ -47,7 +47,7 @@ func Test_Kill(t *testing.T) { assert.NotNil(t, w) assert.Equal(t, StateReady, w.State().Value()) - err = w.Kill(ctx) + err = w.Kill() if err != nil { t.Errorf("error killing the WorkerProcess: error %v", err) } diff --git a/worker_watcher.go b/worker_watcher.go index 5ae54024..d289750e 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -2,15 +2,14 @@ package roadrunner import ( "context" - "errors" + "runtime" "sync" "time" + "github.com/spiral/roadrunner/v2/errors" "github.com/spiral/roadrunner/v2/util" ) -var ErrWatcherStopped = errors.New("watcher stopped") - type Stack struct { workers []WorkerBase mutex sync.RWMutex @@ -19,7 +18,7 @@ type Stack struct { func NewWorkersStack() *Stack { return &Stack{ - workers: make([]WorkerBase, 0), + workers: make([]WorkerBase, 0, runtime.NumCPU()), } } @@ -85,11 +84,7 @@ type WorkerWatcher interface { } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func newWorkerWatcher( - allocator func(args ...interface{}) (WorkerBase, error), - numWorkers int64, - events *util.EventHandler, -) *workerWatcher { +func newWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events *util.EventHandler) *workerWatcher { ww := &workerWatcher{ stack: NewWorkersStack(), allocator: allocator, @@ -127,10 +122,11 @@ func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) e } func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { + const op = errors.Op("get_free_worker") // thread safe operation w, stop := ww.stack.Pop() if stop { - return nil, ErrWatcherStopped + return nil, errors.E(op, errors.ErrWatcherStopped) } // handle worker remove state @@ -146,6 +142,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) // no free stack if w == nil { + // TODO allocate timeout tout := time.NewTicker(time.Second * 180) defer tout.Stop() for { @@ -153,20 +150,20 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) default: w, stop = ww.stack.Pop() if stop { - return nil, ErrWatcherStopped + return nil, errors.E(op, errors.ErrWatcherStopped) } if w == nil { continue } - ww.decreaseNumOfActualWorkers() + ww.ReduceWorkersCount() return w, nil case <-tout.C: - return nil, errors.New("no free stack") + return nil, errors.Str("no free stack") } } } - ww.decreaseNumOfActualWorkers() + ww.ReduceWorkersCount() return w, nil } @@ -198,10 +195,10 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error // found in the stack // remove worker ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.decreaseNumOfActualWorkers() + ww.ReduceWorkersCount() wb.State().Set(StateInvalid) - err := wb.Kill(ctx) + err := wb.Kill() if err != nil { return err } @@ -215,14 +212,19 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error // O(1) operation func (ww *workerWatcher) PushWorker(w WorkerBase) { - ww.mutex.Lock() - ww.actualNumWorkers++ - ww.mutex.Unlock() + ww.IncreaseWorkersCount() ww.stack.Push(w) } func (ww *workerWatcher) ReduceWorkersCount() { - ww.decreaseNumOfActualWorkers() + ww.mutex.Lock() + ww.actualNumWorkers-- + ww.mutex.Unlock() +} +func (ww *workerWatcher) IncreaseWorkersCount() { + ww.mutex.Lock() + ww.actualNumWorkers++ + ww.mutex.Unlock() } // Destroy all underlying stack (but let them to complete the task) @@ -258,9 +260,17 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } } -// Warning, this is O(n) operation +// Warning, this is O(n) operation, and it will return copy of the actual workers func (ww *workerWatcher) WorkersList() []WorkerBase { - return ww.stack.workers + ww.stack.mutex.Lock() + defer ww.stack.mutex.Unlock() + workersCopy := make([]WorkerBase, 0, 1) + for _, v := range ww.stack.workers { + sw := v.(SyncWorker) + workersCopy = append(workersCopy, sw) + } + + return workersCopy } func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { @@ -284,7 +294,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { // worker in the stack, reallocating if ww.stack.workers[i].Pid() == pid { ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.decreaseNumOfActualWorkers() + ww.ReduceWorkersCount() ww.stack.mutex.Unlock() err = ww.AllocateNew(ctx) @@ -321,9 +331,3 @@ func (ww *workerWatcher) addToWatch(wb WorkerBase) { ww.wait(context.Background(), wb) }() } - -func (ww *workerWatcher) decreaseNumOfActualWorkers() { - ww.mutex.Lock() - ww.actualNumWorkers-- - ww.mutex.Unlock() -} |