summaryrefslogtreecommitdiff
path: root/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker_watcher/worker_watcher.go')
-rwxr-xr-xworker_watcher/worker_watcher.go61
1 files changed, 59 insertions, 2 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index e59d9feb..cfde9931 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -82,7 +82,6 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
// Take is not a thread safe operation
func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
-
// thread safe operation
w, err := ww.container.Pop(ctx)
if err != nil {
@@ -222,6 +221,59 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) {
}
}
+func (ww *workerWatcher) Reset(ctx context.Context) {
+ // destroy container, we don't use ww mutex here, since we should be able to push worker
+ ww.Lock()
+ // do not release new workers
+ ww.container.Destroy()
+ ww.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 10)
+ defer tt.Stop()
+ for {
+ select {
+ case <-tt.C:
+ ww.RLock()
+ // that might be one of the workers is working
+ if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
+ ww.RUnlock()
+ continue
+ }
+ ww.RUnlock()
+ // All container at this moment are in the container
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ ww.Lock()
+
+ // drain channel
+ _, _ = ww.container.Pop(ctx)
+ for i := 0; i < len(ww.workers); i++ {
+ ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
+ _ = ww.workers[i].Kill()
+ }
+
+ ww.workers = make([]worker.BaseProcess, 0, atomic.LoadUint64(ww.numWorkers))
+ ww.container = channel.NewVector(atomic.LoadUint64(ww.numWorkers))
+ ww.Unlock()
+ return
+ case <-ctx.Done():
+ // drain channel
+ _, _ = ww.container.Pop(ctx)
+ // kill workers
+ ww.Lock()
+ for i := 0; i < len(ww.workers); i++ {
+ ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
+ _ = ww.workers[i].Kill()
+ }
+
+ ww.workers = make([]worker.BaseProcess, 0, atomic.LoadUint64(ww.numWorkers))
+ ww.container = channel.NewVector(atomic.LoadUint64(ww.numWorkers))
+ ww.Unlock()
+ }
+ }
+}
+
// Destroy all underlying container (but let them complete the task)
func (ww *workerWatcher) Destroy(ctx context.Context) {
// destroy container, we don't use ww mutex here, since we should be able to push worker
@@ -231,7 +283,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
ww.Unlock()
ww.events.Unsubscribe(ww.eventsID)
- tt := time.NewTicker(time.Millisecond * 100)
+ tt := time.NewTicker(time.Millisecond * 10)
defer tt.Stop()
for {
select {
@@ -246,6 +298,8 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
// All container at this moment are in the container
// Pop operation is blocked, push can't be done, since it's not possible to pop
ww.Lock()
+ // drain channel
+ _, _ = ww.container.Pop(ctx)
for i := 0; i < len(ww.workers); i++ {
ww.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
@@ -254,10 +308,13 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
ww.Unlock()
return
case <-ctx.Done():
+ // drain channel
+ _, _ = ww.container.Pop(ctx)
// kill workers
ww.Lock()
for i := 0; i < len(ww.workers); i++ {
ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
_ = ww.workers[i].Kill()
}
ww.Unlock()