summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-12-14 10:48:11 +0300
committerValery Piashchynski <[email protected]>2021-12-14 10:48:11 +0300
commit25acfc64865c6069df7e1911b508fa0ba9c3324a (patch)
treeceedb5c6da7975052172209032d21e79eb38c89c
parente273e2b3fe086450dd58d353ddde0ccce6b146cc (diff)
proper ww destroy
supervised pool ticket stop Signed-off-by: Valery Piashchynski <[email protected]>
-rwxr-xr-xpool/supervisor_pool.go4
-rwxr-xr-xworker_watcher/worker_watcher.go7
2 files changed, 8 insertions, 3 deletions
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 1a94f6a0..a7a1ae52 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -84,15 +84,17 @@ func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
func (sp *supervised) Destroy(ctx context.Context) {
sp.pool.Destroy(ctx)
+ sp.Stop()
}
func (sp *supervised) Start() {
go func() {
watchTout := time.NewTicker(sp.cfg.WatchTick)
+ defer watchTout.Stop()
+
for {
select {
case <-sp.stopCh:
- watchTout.Stop()
return
// stop here
case <-watchTout.C:
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index 6cd01177..e59d9feb 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -236,19 +236,22 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
for {
select {
case <-tt.C:
- ww.Lock()
+ ww.RLock()
// that might be one of the workers is working
if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
- ww.Unlock()
+ ww.RUnlock()
continue
}
+ ww.RUnlock()
// All container at this moment are in the container
// Pop operation is blocked, push can't be done, since it's not possible to pop
+ ww.Lock()
for i := 0; i < len(ww.workers); i++ {
ww.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
_ = ww.workers[i].Kill()
}
+ ww.Unlock()
return
case <-ctx.Done():
// kill workers