From 7b3d08755cb7be2ceefb500f4f01dfbf85e8b8f7 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 14 Dec 2021 10:48:11 +0300 Subject: proper ww destroy supervised pool ticket stop Signed-off-by: Valery Piashchynski --- pool/supervisor_pool.go | 4 +++- worker_watcher/worker_watcher.go | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 1a94f6a0..a7a1ae52 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -84,15 +84,17 @@ func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { func (sp *supervised) Destroy(ctx context.Context) { sp.pool.Destroy(ctx) + sp.Stop() } func (sp *supervised) Start() { go func() { watchTout := time.NewTicker(sp.cfg.WatchTick) + defer watchTout.Stop() + for { select { case <-sp.stopCh: - watchTout.Stop() return // stop here case <-watchTout.C: diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 6cd01177..e59d9feb 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -236,19 +236,22 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { for { select { case <-tt.C: - ww.Lock() + ww.RLock() // that might be one of the workers is working if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { - ww.Unlock() + ww.RUnlock() continue } + ww.RUnlock() // All container at this moment are in the container // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.Lock() for i := 0; i < len(ww.workers); i++ { ww.workers[i].State().Set(worker.StateDestroyed) // kill the worker _ = ww.workers[i].Kill() } + ww.Unlock() return case <-ctx.Done(): // kill workers -- cgit v1.2.3