summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-12-14 13:59:15 +0300
committerGitHub <[email protected]>2021-12-14 13:59:15 +0300
commitf2c79017ae5759256b03ec58b608f298a29e4b96 (patch)
treed4d203806a13e6efe251d9c1a91f657a224207b5
parente273e2b3fe086450dd58d353ddde0ccce6b146cc (diff)
parent8c789bec56646342cc16818ef6c2f646aaa679f5 (diff)
Merge pull request #870 from spiral/bug/supervisor-pool-timer-leak
[#870]: bug(pool,ww): unstopped ticker causes a memory leak
-rw-r--r--.github/workflows/linux.yml2
-rw-r--r--CHANGELOG.md7
-rwxr-xr-xpool/supervisor_pool.go10
-rwxr-xr-xworker/sync_worker.go28
-rwxr-xr-xworker_watcher/worker_watcher.go7
5 files changed, 33 insertions, 21 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index 49a5993c..73d94462 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -27,7 +27,7 @@ jobs:
fail-fast: true
matrix:
php: ["7.4", "8.0", "8.1"]
- go: ["1.17.4"]
+ go: ["1.17.5"]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d0740ad5..b2f28637 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,12 @@
# CHANGELOG
+# v2.6.1 (14.12.2021)
+
+## 🩹 Fixes:
+
+- 🐛 Fix: memory leak when supervised static pool used. [PR](https://github.com/spiral/roadrunner/pull/870).
+
+
# v2.6.0 (30.11.2021)
### 👀 New:
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 1a94f6a0..3dbae417 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -54,12 +54,16 @@ func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*paylo
func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
+ sp.mu.RLock()
+ defer sp.mu.RUnlock()
return sp.pool.Exec(rqs)
}
ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
defer cancel()
+ sp.mu.RLock()
+ defer sp.mu.RUnlock()
res, err := sp.pool.execWithTTL(ctx, rqs)
if err != nil {
return nil, errors.E(op, err)
@@ -83,16 +87,20 @@ func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
}
func (sp *supervised) Destroy(ctx context.Context) {
+ sp.Stop()
+ sp.mu.Lock()
sp.pool.Destroy(ctx)
+ sp.mu.Unlock()
}
func (sp *supervised) Start() {
go func() {
watchTout := time.NewTicker(sp.cfg.WatchTick)
+ defer watchTout.Stop()
+
for {
select {
case <-sp.stopCh:
- watchTout.Stop()
return
// stop here
case <-watchTout.C:
diff --git a/worker/sync_worker.go b/worker/sync_worker.go
index 12937eac..81d8c5bf 100755
--- a/worker/sync_worker.go
+++ b/worker/sync_worker.go
@@ -83,25 +83,19 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
- go func() {
- if len(p.Body) == 0 && len(p.Context) == 0 {
- c <- wexec{
- err: errors.E(op, errors.Str("payload can not be empty")),
- }
- return
- }
-
- if tw.process.State().Value() != StateReady {
- c <- wexec{
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
- }
- return
- }
+ // worker was killed before it started to work (supervisor)
+ if tw.process.State().Value() != StateReady {
+ return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
+ }
+ // set last used time
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(StateWorking)
- // set last used time
- tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(StateWorking)
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
+ }
+ go func() {
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index 6cd01177..e59d9feb 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -236,19 +236,22 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
for {
select {
case <-tt.C:
- ww.Lock()
+ ww.RLock()
// that might be one of the workers is working
if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
- ww.Unlock()
+ ww.RUnlock()
continue
}
+ ww.RUnlock()
// All container at this moment are in the container
// Pop operation is blocked, push can't be done, since it's not possible to pop
+ ww.Lock()
for i := 0; i < len(ww.workers); i++ {
ww.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
_ = ww.workers[i].Kill()
}
+ ww.Unlock()
return
case <-ctx.Done():
// kill workers