summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/container/channel/vec.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/container/channel/vec.go')
-rw-r--r--pkg/worker_watcher/container/channel/vec.go19
1 files changed, 15 insertions, 4 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index 51093978..7fb65a92 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -39,6 +39,7 @@ func (v *Vec) Push(w worker.BaseProcess) {
// because in that case, workers in the v.workers channel can be TTL-ed and killed
// but presenting in the channel
default:
+ // Stop Pop operations
v.Lock()
defer v.Unlock()
@@ -48,19 +49,29 @@ func (v *Vec) Push(w worker.BaseProcess) {
2. Violated Get <-> Release operation (how ??)
*/
for i := uint64(0); i < v.len; i++ {
+ /*
+ We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
+ */
wrk := <-v.workers
switch wrk.State().Value() {
- // skip good states
+ // skip good states, put worker back
case worker.StateWorking, worker.StateReady:
// put the worker back
// generally, while send and receive operations are concurrent (from the channel), channel behave
// like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
v.workers <- wrk
continue
+ /*
+ Bad states are here.
+ */
default:
// kill the current worker (just to be sure it's dead)
- _ = wrk.Kill()
- // replace with the new one
+ if wrk != nil {
+ _ = wrk.Kill()
+ }
+ // replace with the new one and return from the loop
+ // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
+ // But this case will be handled in the worker_watcher::Get
v.workers <- w
return
}
@@ -78,7 +89,7 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
}
*/
- if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ if atomic.LoadUint64(&v.destroy) == 1 {
return nil, errors.E(errors.WatcherStopped)
}