diff options
Diffstat (limited to 'pkg/worker_watcher/container/channel/vec.go')
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 19 |
1 files changed, 15 insertions, 4 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index 51093978..7fb65a92 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -39,6 +39,7 @@ func (v *Vec) Push(w worker.BaseProcess) { // because in that case, workers in the v.workers channel can be TTL-ed and killed // but presenting in the channel default: + // Stop Pop operations v.Lock() defer v.Unlock() @@ -48,19 +49,29 @@ func (v *Vec) Push(w worker.BaseProcess) { 2. Violated Get <-> Release operation (how ??) */ for i := uint64(0); i < v.len; i++ { + /* + We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. + */ wrk := <-v.workers switch wrk.State().Value() { - // skip good states + // skip good states, put worker back case worker.StateWorking, worker.StateReady: // put the worker back // generally, while send and receive operations are concurrent (from the channel), channel behave // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO v.workers <- wrk continue + /* + Bad states are here. + */ default: // kill the current worker (just to be sure it's dead) - _ = wrk.Kill() - // replace with the new one + if wrk != nil { + _ = wrk.Kill() + } + // replace with the new one and return from the loop + // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker + // But this case will be handled in the worker_watcher::Get v.workers <- w return } @@ -78,7 +89,7 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { } */ - if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + if atomic.LoadUint64(&v.destroy) == 1 { return nil, errors.E(errors.WatcherStopped) } |