diff options
Diffstat (limited to 'worker_watcher/worker_watcher.go')
-rwxr-xr-x | worker_watcher/worker_watcher.go | 61 |
1 files changed, 59 insertions, 2 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index e59d9feb..cfde9931 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -82,7 +82,6 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { // Take is not a thread safe operation func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") - // thread safe operation w, err := ww.container.Pop(ctx) if err != nil { @@ -222,6 +221,59 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) { } } +func (ww *workerWatcher) Reset(ctx context.Context) { + // destroy container, we don't use ww mutex here, since we should be able to push worker + ww.Lock() + // do not release new workers + ww.container.Destroy() + ww.Unlock() + + tt := time.NewTicker(time.Millisecond * 10) + defer tt.Stop() + for { + select { + case <-tt.C: + ww.RLock() + // that might be one of the workers is working + if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { + ww.RUnlock() + continue + } + ww.RUnlock() + // All container at this moment are in the container + // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.Lock() + + // drain channel + _, _ = ww.container.Pop(ctx) + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + + ww.workers = make([]worker.BaseProcess, 0, atomic.LoadUint64(ww.numWorkers)) + ww.container = channel.NewVector(atomic.LoadUint64(ww.numWorkers)) + ww.Unlock() + return + case <-ctx.Done(): + // drain channel + _, _ = ww.container.Pop(ctx) + // kill workers + ww.Lock() + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + + ww.workers = make([]worker.BaseProcess, 0, atomic.LoadUint64(ww.numWorkers)) + ww.container = channel.NewVector(atomic.LoadUint64(ww.numWorkers)) + ww.Unlock() + } + } +} + // Destroy all underlying container (but let them complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { // destroy container, we don't use ww mutex here, since we should be able to push worker @@ -231,7 +283,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { ww.Unlock() ww.events.Unsubscribe(ww.eventsID) - tt := time.NewTicker(time.Millisecond * 100) + tt := time.NewTicker(time.Millisecond * 10) defer tt.Stop() for { select { @@ -246,6 +298,8 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { // All container at this moment are in the container // Pop operation is blocked, push can't be done, since it's not possible to pop ww.Lock() + // drain channel + _, _ = ww.container.Pop(ctx) for i := 0; i < len(ww.workers); i++ { ww.workers[i].State().Set(worker.StateDestroyed) // kill the worker @@ -254,10 +308,13 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { ww.Unlock() return case <-ctx.Done(): + // drain channel + _, _ = ww.container.Pop(ctx) // kill workers ww.Lock() for i := 0; i < len(ww.workers); i++ { ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker _ = ww.workers[i].Kill() } ww.Unlock() |