summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-27 15:16:55 +0300
committerValery Piashchynski <[email protected]>2020-10-27 15:16:55 +0300
commitd199ef71b9644afbbba064c317cd0991be1c2443 (patch)
treef777eb90f10ca0e7dbc46227fc76c61f02111946
parent91cf918b30938129609323ded53e190385e019a6 (diff)
Supervised pool
-rwxr-xr-xcomposer.lock167
-rwxr-xr-xerrors/errors.go15
-rwxr-xr-xerrors/go.mod3
-rwxr-xr-xpipe_factory.go5
-rwxr-xr-xplugins/factory/tests/plugin_2.go4
-rwxr-xr-xpool.go18
-rwxr-xr-xsocket_factory.go4
-rwxr-xr-xstate.go1
-rwxr-xr-xstatic_pool.go158
-rwxr-xr-xstatic_pool_test.go2
-rwxr-xr-xsupervisor_pool.go93
-rw-r--r--supervisor_test.go145
-rwxr-xr-xsync_worker.go84
-rw-r--r--tests/memleak.php15
-rw-r--r--tests/sleep.php15
-rwxr-xr-xworker.go4
-rwxr-xr-xworker_test.go2
-rwxr-xr-xworker_watcher.go59
18 files changed, 585 insertions, 209 deletions
diff --git a/composer.lock b/composer.lock
index 7a1094b1..13e6af6b 100755
--- a/composer.lock
+++ b/composer.lock
@@ -89,6 +89,14 @@
"psr-17",
"psr-7"
],
+ "support": {
+ "chat": "https://laminas.dev/chat",
+ "docs": "https://docs.laminas.dev/laminas-diactoros/",
+ "forum": "https://discourse.laminas.dev",
+ "issues": "https://github.com/laminas/laminas-diactoros/issues",
+ "rss": "https://github.com/laminas/laminas-diactoros/releases.atom",
+ "source": "https://github.com/laminas/laminas-diactoros"
+ },
"funding": [
{
"url": "https://funding.communitybridge.org/projects/laminas-project",
@@ -143,6 +151,12 @@
"laminas",
"zf"
],
+ "support": {
+ "forum": "https://discourse.laminas.dev/",
+ "issues": "https://github.com/laminas/laminas-zendframework-bridge/issues",
+ "rss": "https://github.com/laminas/laminas-zendframework-bridge/releases.atom",
+ "source": "https://github.com/laminas/laminas-zendframework-bridge"
+ },
"funding": [
{
"url": "https://funding.communitybridge.org/projects/laminas-project",
@@ -198,6 +212,10 @@
"container-interop",
"psr"
],
+ "support": {
+ "issues": "https://github.com/php-fig/container/issues",
+ "source": "https://github.com/php-fig/container/tree/master"
+ },
"time": "2017-02-14T16:28:37+00:00"
},
{
@@ -250,6 +268,9 @@
"request",
"response"
],
+ "support": {
+ "source": "https://github.com/php-fig/http-factory/tree/master"
+ },
"time": "2019-04-30T12:38:16+00:00"
},
{
@@ -300,6 +321,9 @@
"request",
"response"
],
+ "support": {
+ "source": "https://github.com/php-fig/http-message/tree/master"
+ },
"time": "2016-08-06T14:39:51+00:00"
},
{
@@ -345,20 +369,24 @@
}
],
"description": "High-performance PHP-to-Golang RPC bridge",
+ "support": {
+ "issues": "https://github.com/spiral/goridge-php/issues",
+ "source": "https://github.com/spiral/goridge-php/tree/v2.4.5"
+ },
"time": "2020-08-14T14:28:30+00:00"
},
{
"name": "symfony/console",
- "version": "v5.1.6",
+ "version": "v5.1.7",
"source": {
"type": "git",
"url": "https://github.com/symfony/console.git",
- "reference": "04c3a31fe8ea94b42c9e2d1acc93d19782133b00"
+ "reference": "ae789a8a2ad189ce7e8216942cdb9b77319f5eb8"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/console/zipball/04c3a31fe8ea94b42c9e2d1acc93d19782133b00",
- "reference": "04c3a31fe8ea94b42c9e2d1acc93d19782133b00",
+ "url": "https://api.github.com/repos/symfony/console/zipball/ae789a8a2ad189ce7e8216942cdb9b77319f5eb8",
+ "reference": "ae789a8a2ad189ce7e8216942cdb9b77319f5eb8",
"shasum": ""
},
"require": {
@@ -424,6 +452,9 @@
],
"description": "Symfony Console Component",
"homepage": "https://symfony.com",
+ "support": {
+ "source": "https://github.com/symfony/console/tree/v5.1.7"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -438,24 +469,24 @@
"type": "tidelift"
}
],
- "time": "2020-09-18T14:27:32+00:00"
+ "time": "2020-10-07T15:23:00+00:00"
},
{
"name": "symfony/polyfill-ctype",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-ctype.git",
- "reference": "1c302646f6efc070cd46856e600e5e0684d6b454"
+ "reference": "f4ba089a5b6366e453971d3aad5fe8e897b37f41"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/1c302646f6efc070cd46856e600e5e0684d6b454",
- "reference": "1c302646f6efc070cd46856e600e5e0684d6b454",
+ "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/f4ba089a5b6366e453971d3aad5fe8e897b37f41",
+ "reference": "f4ba089a5b6366e453971d3aad5fe8e897b37f41",
"shasum": ""
},
"require": {
- "php": ">=5.3.3"
+ "php": ">=7.1"
},
"suggest": {
"ext-ctype": "For best performance"
@@ -463,7 +494,7 @@
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -500,6 +531,9 @@
"polyfill",
"portable"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-ctype/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -514,24 +548,24 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/polyfill-intl-grapheme",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-intl-grapheme.git",
- "reference": "b740103edbdcc39602239ee8860f0f45a8eb9aa5"
+ "reference": "c7cf3f858ec7d70b89559d6e6eb1f7c2517d479c"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-intl-grapheme/zipball/b740103edbdcc39602239ee8860f0f45a8eb9aa5",
- "reference": "b740103edbdcc39602239ee8860f0f45a8eb9aa5",
+ "url": "https://api.github.com/repos/symfony/polyfill-intl-grapheme/zipball/c7cf3f858ec7d70b89559d6e6eb1f7c2517d479c",
+ "reference": "c7cf3f858ec7d70b89559d6e6eb1f7c2517d479c",
"shasum": ""
},
"require": {
- "php": ">=5.3.3"
+ "php": ">=7.1"
},
"suggest": {
"ext-intl": "For best performance"
@@ -539,7 +573,7 @@
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -578,6 +612,9 @@
"portable",
"shim"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-intl-grapheme/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -592,24 +629,24 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/polyfill-intl-normalizer",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-intl-normalizer.git",
- "reference": "37078a8dd4a2a1e9ab0231af7c6cb671b2ed5a7e"
+ "reference": "727d1096295d807c309fb01a851577302394c897"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-intl-normalizer/zipball/37078a8dd4a2a1e9ab0231af7c6cb671b2ed5a7e",
- "reference": "37078a8dd4a2a1e9ab0231af7c6cb671b2ed5a7e",
+ "url": "https://api.github.com/repos/symfony/polyfill-intl-normalizer/zipball/727d1096295d807c309fb01a851577302394c897",
+ "reference": "727d1096295d807c309fb01a851577302394c897",
"shasum": ""
},
"require": {
- "php": ">=5.3.3"
+ "php": ">=7.1"
},
"suggest": {
"ext-intl": "For best performance"
@@ -617,7 +654,7 @@
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -659,6 +696,9 @@
"portable",
"shim"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-intl-normalizer/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -673,24 +713,24 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/polyfill-mbstring",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-mbstring.git",
- "reference": "a6977d63bf9a0ad4c65cd352709e230876f9904a"
+ "reference": "39d483bdf39be819deabf04ec872eb0b2410b531"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/a6977d63bf9a0ad4c65cd352709e230876f9904a",
- "reference": "a6977d63bf9a0ad4c65cd352709e230876f9904a",
+ "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/39d483bdf39be819deabf04ec872eb0b2410b531",
+ "reference": "39d483bdf39be819deabf04ec872eb0b2410b531",
"shasum": ""
},
"require": {
- "php": ">=5.3.3"
+ "php": ">=7.1"
},
"suggest": {
"ext-mbstring": "For best performance"
@@ -698,7 +738,7 @@
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -736,6 +776,9 @@
"portable",
"shim"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-mbstring/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -750,29 +793,29 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/polyfill-php73",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-php73.git",
- "reference": "fffa1a52a023e782cdcc221d781fe1ec8f87fcca"
+ "reference": "8ff431c517be11c78c48a39a66d37431e26a6bed"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-php73/zipball/fffa1a52a023e782cdcc221d781fe1ec8f87fcca",
- "reference": "fffa1a52a023e782cdcc221d781fe1ec8f87fcca",
+ "url": "https://api.github.com/repos/symfony/polyfill-php73/zipball/8ff431c517be11c78c48a39a66d37431e26a6bed",
+ "reference": "8ff431c517be11c78c48a39a66d37431e26a6bed",
"shasum": ""
},
"require": {
- "php": ">=5.3.3"
+ "php": ">=7.1"
},
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -812,6 +855,9 @@
"portable",
"shim"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-php73/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -826,29 +872,29 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/polyfill-php80",
- "version": "v1.18.1",
+ "version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-php80.git",
- "reference": "d87d5766cbf48d72388a9f6b85f280c8ad51f981"
+ "reference": "e70aa8b064c5b72d3df2abd5ab1e90464ad009de"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/symfony/polyfill-php80/zipball/d87d5766cbf48d72388a9f6b85f280c8ad51f981",
- "reference": "d87d5766cbf48d72388a9f6b85f280c8ad51f981",
+ "url": "https://api.github.com/repos/symfony/polyfill-php80/zipball/e70aa8b064c5b72d3df2abd5ab1e90464ad009de",
+ "reference": "e70aa8b064c5b72d3df2abd5ab1e90464ad009de",
"shasum": ""
},
"require": {
- "php": ">=7.0.8"
+ "php": ">=7.1"
},
"type": "library",
"extra": {
"branch-alias": {
- "dev-master": "1.18-dev"
+ "dev-main": "1.20-dev"
},
"thanks": {
"name": "symfony/polyfill",
@@ -892,6 +938,9 @@
"portable",
"shim"
],
+ "support": {
+ "source": "https://github.com/symfony/polyfill-php80/tree/v1.20.0"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -906,7 +955,7 @@
"type": "tidelift"
}
],
- "time": "2020-07-14T12:35:20+00:00"
+ "time": "2020-10-23T14:02:19+00:00"
},
{
"name": "symfony/service-contracts",
@@ -968,6 +1017,9 @@
"interoperability",
"standards"
],
+ "support": {
+ "source": "https://github.com/symfony/service-contracts/tree/master"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -986,7 +1038,7 @@
},
{
"name": "symfony/string",
- "version": "v5.1.6",
+ "version": "v5.1.7",
"source": {
"type": "git",
"url": "https://github.com/symfony/string.git",
@@ -1053,6 +1105,9 @@
"utf-8",
"utf8"
],
+ "support": {
+ "source": "https://github.com/symfony/string/tree/v5.1.7"
+ },
"funding": [
{
"url": "https://symfony.com/sponsor",
@@ -1073,16 +1128,16 @@
"packages-dev": [
{
"name": "phpstan/phpstan",
- "version": "0.12.46",
+ "version": "0.12.52",
"source": {
"type": "git",
"url": "https://github.com/phpstan/phpstan.git",
- "reference": "9419738e20f0c49757be05d22969c1c44c1dff3b"
+ "reference": "e96dd5e7ae9aefed663bc7e285ad96792b67eadc"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/phpstan/phpstan/zipball/9419738e20f0c49757be05d22969c1c44c1dff3b",
- "reference": "9419738e20f0c49757be05d22969c1c44c1dff3b",
+ "url": "https://api.github.com/repos/phpstan/phpstan/zipball/e96dd5e7ae9aefed663bc7e285ad96792b67eadc",
+ "reference": "e96dd5e7ae9aefed663bc7e285ad96792b67eadc",
"shasum": ""
},
"require": {
@@ -1111,6 +1166,10 @@
"MIT"
],
"description": "PHPStan - PHP Static Analysis Tool",
+ "support": {
+ "issues": "https://github.com/phpstan/phpstan/issues",
+ "source": "https://github.com/phpstan/phpstan/tree/0.12.52"
+ },
"funding": [
{
"url": "https://github.com/ondrejmirtes",
@@ -1125,7 +1184,7 @@
"type": "tidelift"
}
],
- "time": "2020-09-28T09:48:55+00:00"
+ "time": "2020-10-25T07:23:44+00:00"
}
],
"aliases": [],
@@ -1139,5 +1198,5 @@
"ext-curl": "*"
},
"platform-dev": [],
- "plugin-api-version": "1.1.0"
+ "plugin-api-version": "2.0.0"
}
diff --git a/errors/errors.go b/errors/errors.go
index def408d8..c9455367 100755
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -39,21 +39,18 @@ type Kind uint8
// Kinds of errors.
const (
Undefined Kind = iota // Undefined error.
- Network
- Other
- Test
+ ErrWatcherStopped
+ TimeOut
)
func (k Kind) String() string {
switch k {
case Undefined:
return "UNDEF"
- case Network:
- return "Network error"
- case Other:
- return "Other"
- case Test:
- return "Test"
+ case ErrWatcherStopped:
+ return "Watcher stopped"
+ case TimeOut:
+ return "TimedOut"
}
return "unknown error kind"
diff --git a/errors/go.mod b/errors/go.mod
deleted file mode 100755
index 1eaacc23..00000000
--- a/errors/go.mod
+++ /dev/null
@@ -1,3 +0,0 @@
-module github.com/48d90782/errors
-
-go 1.15
diff --git a/pipe_factory.go b/pipe_factory.go
index a6c94614..807d7793 100755
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -84,7 +84,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
}
// todo kill timeout
- errK := w.Kill(ctx)
+ errK := w.Kill()
if errK != nil {
errs = append(errs, fmt.Errorf("error killing the worker with PID number %d, Created: %s", w.Pid(), w.Created()).Error())
}
@@ -164,8 +164,7 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
errs = append(errs, errF.Error())
}
- // todo kill timeout ??
- errK := w.Kill(context.Background())
+ errK := w.Kill()
if errK != nil {
errs = append(errs, fmt.Errorf("error killing the worker with PID number %d, Created: %s", w.Pid(), w.Created()).Error())
}
diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go
index 9f401bec..2311b7bf 100755
--- a/plugins/factory/tests/plugin_2.go
+++ b/plugins/factory/tests/plugin_2.go
@@ -61,11 +61,11 @@ func (f *Foo2) Serve() chan error {
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- Supervisor: roadrunner.SupervisorConfig{
+ Supervisor: &roadrunner.SupervisorConfig{
WatchTick: 60,
TTL: 1000,
IdleTTL: 10,
- ExecTTL: time.Second * 10,
+ ExecTTL: 10,
MaxWorkerMemory: 1000,
},
}
diff --git a/pool.go b/pool.go
index bc57bcbd..721b67c1 100755
--- a/pool.go
+++ b/pool.go
@@ -54,6 +54,8 @@ type Pool interface {
// Exec
Exec(rqs Payload) (Payload, error)
+ ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+
// Workers returns worker list associated with the pool.
Workers() (workers []WorkerBase)
@@ -84,7 +86,7 @@ type Config struct {
DestroyTimeout time.Duration
// Supervision config to limit worker and pool memory usage.
- Supervisor SupervisorConfig
+ Supervisor *SupervisorConfig
}
// InitDefaults enables default config values.
@@ -100,22 +102,24 @@ func (cfg *Config) InitDefaults() {
if cfg.DestroyTimeout == 0 {
cfg.DestroyTimeout = time.Minute
}
-
+ if cfg.Supervisor == nil {
+ return
+ }
cfg.Supervisor.InitDefaults()
}
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
- WatchTick time.Duration
+ WatchTick uint64
// TTL defines maximum time worker is allowed to live.
- TTL int64
+ TTL uint64
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL int64
+ IdleTTL uint64
// ExecTTL defines maximum lifetime per job.
- ExecTTL time.Duration
+ ExecTTL uint64
// MaxWorkerMemory limits memory per worker.
MaxWorkerMemory uint64
@@ -124,6 +128,6 @@ type SupervisorConfig struct {
// InitDefaults enables default config values.
func (cfg *SupervisorConfig) InitDefaults() {
if cfg.WatchTick == 0 {
- cfg.WatchTick = time.Second
+ cfg.WatchTick = 1
}
}
diff --git a/socket_factory.go b/socket_factory.go
index ed151f2d..6f29db22 100755
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -109,7 +109,7 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm
if err != nil {
err = multierr.Combine(
err,
- w.Kill(context.Background()),
+ w.Kill(),
w.Wait(context.Background()),
)
@@ -158,7 +158,7 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
rl, err := f.findRelay(w)
if err != nil {
errs = append(errs, err.Error())
- err = w.Kill(ctx)
+ err = w.Kill()
if err != nil {
errs = append(errs, err.Error())
}
diff --git a/state.go b/state.go
index 2e36c977..91c29946 100755
--- a/state.go
+++ b/state.go
@@ -43,7 +43,6 @@ const (
StateStopping
StateKilling
- StateKilled
// State of worker, when no need to allocate new one
StateDestroyed
diff --git a/static_pool.go b/static_pool.go
index 4ecbdd41..3af933c3 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -6,9 +6,8 @@ import (
"os/exec"
"sync"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
-
- "github.com/pkg/errors"
)
// StopRequest can be sent by worker to indicate that restart is required.
@@ -34,9 +33,6 @@ type StaticPool struct {
// manages worker states and TTLs
ww *workerWatcher
-
- // supervises memory and TTL of workers
- // sp *supervisedPool
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -74,9 +70,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con
return nil, err
}
- // todo: implement
- // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor)
- // p.sp.Start()
+ // if supervised config not nil, guess, that pool wanted to be supervised
+ if cfg.Supervisor != nil {
+ sp := newPoolWatcher(p, p.events, p.cfg.Supervisor)
+ // start watcher timer
+ sp.Start()
+ return sp, nil
+ }
return p, nil
}
@@ -101,9 +101,10 @@ func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
}
func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
+ const op = errors.Op("Exec")
w, err := p.ww.GetFreeWorker(context.Background())
- if err != nil && errors.Is(err, ErrWatcherStopped) {
- return EmptyPayload, ErrWatcherStopped
+ if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
+ return EmptyPayload, errors.E(op, err)
} else if err != nil {
return EmptyPayload, err
}
@@ -167,76 +168,73 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
return rsp, nil
}
-// Exec one task with given payload and context, returns result or error.
-// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
-// // todo: why TODO passed here?
-// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
-// defer cancel()
-// w, err := p.ww.GetFreeWorker(getWorkerCtx)
-// if err != nil && errors.Is(err, ErrWatcherStopped) {
-// return EmptyPayload, ErrWatcherStopped
-// } else if err != nil {
-// return EmptyPayload, err
-// }
-//
-// sw := w.(SyncWorker)
-//
-// // todo: implement worker destroy
-// //execCtx context.Context
-// //if p.cfg.Supervisor.ExecTTL != 0 {
-// // var cancel2 context.CancelFunc
-// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL)
-// // defer cancel2()
-// //} else {
-// // execCtx = ctx
-// //}
-//
-// rsp, err := sw.Exec(rqs)
-// if err != nil {
-// errJ := p.checkMaxJobs(ctx, w)
-// if errJ != nil {
-// // todo: worker was not destroyed
-// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
-// }
-//
-// // soft job errors are allowed
-// if _, jobError := err.(JobError); jobError {
-// p.ww.PushWorker(w)
-// return EmptyPayload, err
-// }
-//
-// sw.State().Set(StateInvalid)
-// errS := w.Stop(ctx)
-// if errS != nil {
-// return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
-// }
-//
-// return EmptyPayload, err
-// }
-//
-// // worker want's to be terminated
-// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
-// w.State().Set(StateInvalid)
-// err = w.Stop(ctx)
-// if err != nil {
-// return EmptyPayload, err
-// }
-// return p.ExecWithContext(ctx, rqs)
-// }
-//
-// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
-// err = p.ww.AllocateNew(ctx)
-// if err != nil {
-// return EmptyPayload, err
-// }
-// } else {
-// p.muw.Lock()
-// p.ww.PushWorker(w)
-// p.muw.Unlock()
-// }
-//
-// return rsp, nil
-// }
+func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+ const op = errors.Op("Exec")
+ w, err := p.ww.GetFreeWorker(context.Background())
+ if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
+ return EmptyPayload, errors.E(op, err)
+ } else if err != nil {
+ return EmptyPayload, err
+ }
+
+ sw := w.(SyncWorker)
+
+ rsp, err := sw.ExecWithContext(ctx, rqs)
+ if err != nil {
+ // soft job errors are allowed
+ if _, jobError := err.(JobError); jobError {
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ err := p.ww.AllocateNew(bCtx)
+ if err != nil {
+ p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ }
+
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ }
+ } else {
+ p.ww.PushWorker(w)
+ }
+
+ return EmptyPayload, err
+ }
+
+ sw.State().Set(StateInvalid)
+ p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
+ errS := w.Stop(bCtx)
+
+ if errS != nil {
+ return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+ }
+
+ return EmptyPayload, err
+ }
+
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ }
+
+ return p.Exec(rqs)
+ }
+
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ err = p.ww.AllocateNew(bCtx)
+ if err != nil {
+ return EmptyPayload, err
+ }
+ } else {
+ p.muw.Lock()
+ p.ww.PushWorker(w)
+ p.muw.Unlock()
+ }
+ return rsp, nil
+}
// Destroy all underlying stack (but let them to complete the task).
func (p *StaticPool) Destroy(ctx context.Context) {
diff --git a/static_pool_test.go b/static_pool_test.go
index ec80e92a..8633f9c5 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -235,7 +235,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
})
// killing random worker and expecting pool to replace it
- err = p.Workers()[0].Kill(ctx)
+ err = p.Workers()[0].Kill()
if err != nil {
t.Errorf("error killing the process: error %v", err)
}
diff --git a/supervisor_pool.go b/supervisor_pool.go
index 9d1d2b1e..0293ab8b 100755
--- a/supervisor_pool.go
+++ b/supervisor_pool.go
@@ -2,8 +2,10 @@ package roadrunner
import (
"context"
+ "sync"
"time"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
)
@@ -11,31 +13,100 @@ const MB = 1024 * 1024
type SupervisedPool interface {
Pool
-
- // ExecWithContext provides the ability to execute with time deadline. Attention, worker will be destroyed if context
- // deadline reached.
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+ Start()
}
type supervisedPool struct {
- cfg SupervisorConfig
+ cfg *SupervisorConfig
events *util.EventHandler
pool Pool
stopCh chan struct{}
+ mu *sync.RWMutex
}
-func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool {
- return &supervisedPool{
+func newPoolWatcher(pool Pool, events *util.EventHandler, cfg *SupervisorConfig) SupervisedPool {
+ sp := &supervisedPool{
cfg: cfg,
events: events,
pool: pool,
+ mu: &sync.RWMutex{},
stopCh: make(chan struct{}),
}
+ return sp
+}
+
+type ttlExec struct {
+ err error
+ p Payload
+}
+
+func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+ const op = errors.Op("exec_supervised")
+ if sp.cfg.ExecTTL == 0 {
+ return sp.pool.Exec(rqs)
+ }
+
+ c := make(chan ttlExec, 1)
+ ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL))
+ defer cancel()
+ go func() {
+ res, err := sp.pool.ExecWithContext(ctx, rqs)
+ if err != nil {
+ c <- ttlExec{
+ err: err,
+ p: EmptyPayload,
+ }
+ }
+
+ c <- ttlExec{
+ err: nil,
+ p: res,
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return EmptyPayload, errors.E(op, errors.TimeOut, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return EmptyPayload, res.err
+ }
+
+ return res.p, nil
+ }
+ }
+}
+
+func (sp *supervisedPool) Exec(p Payload) (Payload, error) {
+ return sp.pool.Exec(p)
+}
+
+func (sp *supervisedPool) AddListener(listener util.EventListener) {
+ sp.pool.AddListener(listener)
+}
+
+func (sp *supervisedPool) GetConfig() Config {
+ return sp.pool.GetConfig()
+}
+
+func (sp *supervisedPool) Workers() (workers []WorkerBase) {
+ sp.mu.Lock()
+ defer sp.mu.Unlock()
+ return sp.pool.Workers()
+}
+
+func (sp *supervisedPool) RemoveWorker(ctx context.Context, worker WorkerBase) error {
+ return sp.pool.RemoveWorker(ctx, worker)
+}
+
+func (sp *supervisedPool) Destroy(ctx context.Context) {
+ sp.pool.Destroy(ctx)
}
func (sp *supervisedPool) Start() {
go func() {
- watchTout := time.NewTicker(sp.cfg.WatchTick)
+ watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick))
for {
select {
case <-sp.stopCh:
@@ -43,7 +114,9 @@ func (sp *supervisedPool) Start() {
return
// stop here
case <-watchTout.C:
+ sp.mu.Lock()
sp.control()
+ sp.mu.Unlock()
}
}
}()
@@ -89,7 +162,7 @@ func (sp *supervisedPool) control() {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
return
} else {
- sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
+ sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]})
}
continue
@@ -116,7 +189,7 @@ func (sp *supervisedPool) control() {
res := int64(lu) - now.UnixNano()
// maxWorkerIdle more than diff between now and last used
- if sp.cfg.IdleTTL-res <= 0 {
+ if sp.cfg.IdleTTL-uint64(res) <= 0 {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
diff --git a/supervisor_test.go b/supervisor_test.go
new file mode 100644
index 00000000..68c2ddaf
--- /dev/null
+++ b/supervisor_test.go
@@ -0,0 +1,145 @@
+package roadrunner
+
+import (
+ "context"
+ "os/exec"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+var cfgSupervised = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 100,
+ ExecTTL: 100,
+ MaxWorkerMemory: 100,
+ },
+}
+
+func TestSupervisedPool_Exec(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "tests/memleak.php", "pipes") },
+ NewPipeFactory(),
+ cfgSupervised,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ stopCh := make(chan struct{})
+ defer p.Destroy(context.Background())
+
+ go func() {
+ for ; ; {
+ select {
+ case <-stopCh:
+ return
+ default:
+ workers := p.Workers()
+ if len(workers) > 0 {
+ s, err := WorkerProcessState(workers[0])
+ assert.NoError(t, err)
+ assert.NotNil(t, s)
+ // since this is soft limit, double max memory limit watch
+ if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 {
+ t.Fatal("max memory reached")
+ }
+ }
+ }
+ }
+ }()
+
+ for i := 0; i < 100; i++ {
+ time.Sleep(time.Millisecond * 50)
+ _, err = p.Exec(Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+ assert.NoError(t, err)
+ }
+
+ stopCh <- struct{}{}
+}
+
+func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 100,
+ ExecTTL: 1,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "tests/sleep.php", "pipes") },
+ NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.ExecWithContext(context.Background(), Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Error(t, err)
+ assert.Empty(t, resp)
+
+ time.Sleep(time.Second * 3)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+}
+
+func
+TestSupervisedPool_ExecTTL_OK(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 100,
+ ExecTTL: 4,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "tests/sleep.php", "pipes") },
+ NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ time.Sleep(time.Millisecond * 100)
+ resp, err := p.Exec(Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp)
+}
diff --git a/sync_worker.go b/sync_worker.go
index d7c15e88..31d68168 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -5,9 +5,10 @@ import (
"fmt"
"time"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
+ "go.uber.org/multierr"
- "github.com/pkg/errors"
"github.com/spiral/goridge/v2"
)
@@ -19,6 +20,7 @@ type SyncWorker interface {
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs Payload) (Payload, error)
+ ExecWithContext(ctx context.Context, p Payload) (Payload, error)
}
type syncWorker struct {
@@ -60,14 +62,82 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) {
return rsp, nil
}
+type wexec struct {
+ payload Payload
+ err error
+}
+
+// Exec payload without TTL timeout.
+func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, error) {
+ const op = errors.Op("exec_with_context")
+ c := make(chan wexec, 1)
+ go func() {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, errors.Str("payload can not be empty")),
+ }
+ return
+ }
+
+ if tw.w.State().Value() != StateReady {
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())),
+ }
+ return
+ }
+
+ // set last used time
+ tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.w.State().Set(StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ if _, ok := err.(JobError); !ok {
+ tw.w.State().Set(StateErrored)
+ tw.w.State().RegisterExec()
+ }
+ c <- wexec{
+ payload: EmptyPayload,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ tw.w.State().Set(StateReady)
+ tw.w.State().RegisterExec()
+
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ err := multierr.Combine(tw.Kill())
+ if err != nil {
+ return EmptyPayload, multierr.Append(err, ctx.Err())
+ }
+ return EmptyPayload, ctx.Err()
+ case res := <-c:
+ if res.err != nil {
+ return EmptyPayload, res.err
+ }
+ return res.payload, nil
+ }
+}
+
func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
+ const op = errors.Op("exec_payload")
// two things; todo: merge
if err := sendControl(tw.w.Relay(), rqs.Context); err != nil {
- return EmptyPayload, errors.Wrap(err, "header error")
+ return EmptyPayload, errors.E(op, err, "header error")
}
if err := tw.w.Relay().Send(rqs.Body, 0); err != nil {
- return EmptyPayload, errors.Wrap(err, "sender error")
+ return EmptyPayload, errors.E(op, err, "sender error")
}
var pr goridge.Prefix
@@ -75,7 +145,7 @@ func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
var err error
if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.Wrap(err, "WorkerProcess error")
+ return EmptyPayload, errors.E(op, err, "WorkerProcess error")
}
if !pr.HasFlag(goridge.PayloadControl) {
@@ -88,7 +158,7 @@ func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
// add streaming support :)
if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.Wrap(err, "WorkerProcess error")
+ return EmptyPayload, errors.E(op, err, "WorkerProcess error")
}
return rsp, nil
@@ -126,8 +196,8 @@ func (tw *syncWorker) Stop(ctx context.Context) error {
return tw.w.Stop(ctx)
}
-func (tw *syncWorker) Kill(ctx context.Context) error {
- return tw.w.Kill(ctx)
+func (tw *syncWorker) Kill() error {
+ return tw.w.Kill()
}
func (tw *syncWorker) Relay() goridge.Relay {
diff --git a/tests/memleak.php b/tests/memleak.php
new file mode 100644
index 00000000..b78a76c0
--- /dev/null
+++ b/tests/memleak.php
@@ -0,0 +1,15 @@
+<?php
+
+declare(strict_types=1);
+
+use Spiral\Goridge\StreamRelay;
+use Spiral\RoadRunner\Worker as RoadRunner;
+
+require dirname(__DIR__) . "/vendor_php/autoload.php";
+
+$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
+$mem = '';
+while($rr->receive($ctx)){
+ $mem .= str_repeat(" ", 1024*1024);
+ $rr->send("");
+} \ No newline at end of file
diff --git a/tests/sleep.php b/tests/sleep.php
new file mode 100644
index 00000000..b3ea8235
--- /dev/null
+++ b/tests/sleep.php
@@ -0,0 +1,15 @@
+<?php
+
+declare(strict_types=1);
+
+use Spiral\Goridge\StreamRelay;
+use Spiral\RoadRunner\Worker as RoadRunner;
+
+require dirname(__DIR__) . "/vendor_php/autoload.php";
+
+$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
+
+while($rr->receive($ctx)){
+ sleep(3);
+ $rr->send("");
+} \ No newline at end of file
diff --git a/worker.go b/worker.go
index 2dda51cc..ef532f51 100755
--- a/worker.go
+++ b/worker.go
@@ -74,7 +74,7 @@ type WorkerBase interface {
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
- Kill(ctx context.Context) error
+ Kill() error
// Relay returns attached to worker goridge relay
Relay() goridge.Relay
@@ -280,7 +280,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error {
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
-func (w *WorkerProcess) Kill(ctx context.Context) error {
+func (w *WorkerProcess) Kill() error {
w.state.Set(StateKilling)
w.mu.Lock()
defer w.mu.Unlock()
diff --git a/worker_test.go b/worker_test.go
index d2744345..78738064 100755
--- a/worker_test.go
+++ b/worker_test.go
@@ -47,7 +47,7 @@ func Test_Kill(t *testing.T) {
assert.NotNil(t, w)
assert.Equal(t, StateReady, w.State().Value())
- err = w.Kill(ctx)
+ err = w.Kill()
if err != nil {
t.Errorf("error killing the WorkerProcess: error %v", err)
}
diff --git a/worker_watcher.go b/worker_watcher.go
index 25c88a1a..0eb8152b 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -2,14 +2,14 @@ package roadrunner
import (
"context"
- "errors"
"sync"
"time"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
)
-var ErrWatcherStopped = errors.New("watcher stopped")
+//var = errors.New("watcher stopped")
type Stack struct {
workers []WorkerBase
@@ -85,11 +85,7 @@ type WorkerWatcher interface {
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func newWorkerWatcher(
- allocator func(args ...interface{}) (WorkerBase, error),
- numWorkers int64,
- events *util.EventHandler,
-) *workerWatcher {
+func newWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events *util.EventHandler) *workerWatcher {
ww := &workerWatcher{
stack: NewWorkersStack(),
allocator: allocator,
@@ -127,10 +123,11 @@ func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) e
}
func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
+ const op = errors.Op("get_free_worker")
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
- return nil, ErrWatcherStopped
+ return nil, errors.E(op, errors.ErrWatcherStopped)
}
// handle worker remove state
@@ -146,6 +143,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
// no free stack
if w == nil {
+ // TODO allocate timeout
tout := time.NewTicker(time.Second * 180)
defer tout.Stop()
for {
@@ -153,20 +151,20 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
default:
w, stop = ww.stack.Pop()
if stop {
- return nil, ErrWatcherStopped
+ return nil, errors.E(op, errors.ErrWatcherStopped)
}
if w == nil {
continue
}
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
return w, nil
case <-tout.C:
- return nil, errors.New("no free stack")
+ return nil, errors.Str("no free stack")
}
}
}
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
return w, nil
}
@@ -198,10 +196,10 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error
// found in the stack
// remove worker
ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
wb.State().Set(StateInvalid)
- err := wb.Kill(ctx)
+ err := wb.Kill()
if err != nil {
return err
}
@@ -215,14 +213,19 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error
// O(1) operation
func (ww *workerWatcher) PushWorker(w WorkerBase) {
- ww.mutex.Lock()
- ww.actualNumWorkers++
- ww.mutex.Unlock()
+ ww.IncreaseWorkersCount()
ww.stack.Push(w)
}
func (ww *workerWatcher) ReduceWorkersCount() {
- ww.decreaseNumOfActualWorkers()
+ ww.mutex.Lock()
+ ww.actualNumWorkers--
+ ww.mutex.Unlock()
+}
+func (ww *workerWatcher) IncreaseWorkersCount() {
+ ww.mutex.Lock()
+ ww.actualNumWorkers++
+ ww.mutex.Unlock()
}
// Destroy all underlying stack (but let them to complete the task)
@@ -258,9 +261,17 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
}
-// Warning, this is O(n) operation
+// Warning, this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) WorkersList() []WorkerBase {
- return ww.stack.workers
+ ww.stack.mutex.Lock()
+ defer ww.stack.mutex.Unlock()
+ workersCopy := make([]WorkerBase, 0, 1)
+ for _, v := range ww.stack.workers {
+ sw := v.(SyncWorker)
+ workersCopy = append(workersCopy, sw)
+ }
+
+ return workersCopy
}
func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
@@ -284,7 +295,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
// worker in the stack, reallocating
if ww.stack.workers[i].Pid() == pid {
ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
ww.stack.mutex.Unlock()
err = ww.AllocateNew(ctx)
@@ -321,9 +332,3 @@ func (ww *workerWatcher) addToWatch(wb WorkerBase) {
ww.wait(context.Background(), wb)
}()
}
-
-func (ww *workerWatcher) decreaseNumOfActualWorkers() {
- ww.mutex.Lock()
- ww.actualNumWorkers--
- ww.mutex.Unlock()
-}