diff options
author | Valery Piashchynski <[email protected]> | 2021-12-14 13:59:15 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-12-14 13:59:15 +0300 |
commit | f2c79017ae5759256b03ec58b608f298a29e4b96 (patch) | |
tree | d4d203806a13e6efe251d9c1a91f657a224207b5 | |
parent | e273e2b3fe086450dd58d353ddde0ccce6b146cc (diff) | |
parent | 8c789bec56646342cc16818ef6c2f646aaa679f5 (diff) |
Merge pull request #870 from spiral/bug/supervisor-pool-timer-leak
[#870]: bug(pool,ww): unstopped ticker causes a memory leak
-rw-r--r-- | .github/workflows/linux.yml | 2 | ||||
-rw-r--r-- | CHANGELOG.md | 7 | ||||
-rwxr-xr-x | pool/supervisor_pool.go | 10 | ||||
-rwxr-xr-x | worker/sync_worker.go | 28 | ||||
-rwxr-xr-x | worker_watcher/worker_watcher.go | 7 |
5 files changed, 33 insertions, 21 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 49a5993c..73d94462 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -27,7 +27,7 @@ jobs: fail-fast: true matrix: php: ["7.4", "8.0", "8.1"] - go: ["1.17.4"] + go: ["1.17.5"] os: ["ubuntu-latest"] steps: - name: Set up Go ${{ matrix.go }} diff --git a/CHANGELOG.md b/CHANGELOG.md index d0740ad5..b2f28637 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # CHANGELOG +# v2.6.1 (14.12.2021) + +## 🩹 Fixes: + +- 🐛 Fix: memory leak when supervised static pool used. [PR](https://github.com/spiral/roadrunner/pull/870). + + # v2.6.0 (30.11.2021) ### 👀 New: diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 1a94f6a0..3dbae417 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -54,12 +54,16 @@ func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*paylo func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { + sp.mu.RLock() + defer sp.mu.RUnlock() return sp.pool.Exec(rqs) } ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) defer cancel() + sp.mu.RLock() + defer sp.mu.RUnlock() res, err := sp.pool.execWithTTL(ctx, rqs) if err != nil { return nil, errors.E(op, err) @@ -83,16 +87,20 @@ func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { } func (sp *supervised) Destroy(ctx context.Context) { + sp.Stop() + sp.mu.Lock() sp.pool.Destroy(ctx) + sp.mu.Unlock() } func (sp *supervised) Start() { go func() { watchTout := time.NewTicker(sp.cfg.WatchTick) + defer watchTout.Stop() + for { select { case <-sp.stopCh: - watchTout.Stop() return // stop here case <-watchTout.C: diff --git a/worker/sync_worker.go b/worker/sync_worker.go index 12937eac..81d8c5bf 100755 --- a/worker/sync_worker.go +++ b/worker/sync_worker.go @@ -83,25 +83,19 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) ( const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) - go func() { - if len(p.Body) == 0 && len(p.Context) == 0 { - c <- wexec{ - err: errors.E(op, errors.Str("payload can not be empty")), - } - return - } - - if tw.process.State().Value() != StateReady { - c <- wexec{ - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), - } - return - } + // worker was killed before it started to work (supervisor) + if tw.process.State().Value() != StateReady { + return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) + } + // set last used time + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(StateWorking) - // set last used time - tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(StateWorking) + if len(p.Body) == 0 && len(p.Context) == 0 { + return nil, errors.E(op, errors.Str("payload can not be empty")) + } + go func() { rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 6cd01177..e59d9feb 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -236,19 +236,22 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { for { select { case <-tt.C: - ww.Lock() + ww.RLock() // that might be one of the workers is working if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { - ww.Unlock() + ww.RUnlock() continue } + ww.RUnlock() // All container at this moment are in the container // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.Lock() for i := 0; i < len(ww.workers); i++ { ww.workers[i].State().Set(worker.StateDestroyed) // kill the worker _ = ww.workers[i].Kill() } + ww.Unlock() return case <-ctx.Done(): // kill workers |