diff options
author | Valery Piashchynski <[email protected]> | 2021-08-17 19:55:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-17 19:55:15 +0300 |
commit | ab690ab9c6ae2b00aef1b501e8b17ff02b5da753 (patch) | |
tree | 0a58b043605ef1d9b09e75b207c236aacb1ed55a /pkg/worker_watcher/container | |
parent | bd0da830ae345e1ed4a67782bf413673beeba037 (diff) |
Update to go 1.17
Add Stat with tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher/container')
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 19 |
1 files changed, 15 insertions, 4 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index 51093978..7fb65a92 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -39,6 +39,7 @@ func (v *Vec) Push(w worker.BaseProcess) { // because in that case, workers in the v.workers channel can be TTL-ed and killed // but presenting in the channel default: + // Stop Pop operations v.Lock() defer v.Unlock() @@ -48,19 +49,29 @@ func (v *Vec) Push(w worker.BaseProcess) { 2. Violated Get <-> Release operation (how ??) */ for i := uint64(0); i < v.len; i++ { + /* + We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. + */ wrk := <-v.workers switch wrk.State().Value() { - // skip good states + // skip good states, put worker back case worker.StateWorking, worker.StateReady: // put the worker back // generally, while send and receive operations are concurrent (from the channel), channel behave // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO v.workers <- wrk continue + /* + Bad states are here. + */ default: // kill the current worker (just to be sure it's dead) - _ = wrk.Kill() - // replace with the new one + if wrk != nil { + _ = wrk.Kill() + } + // replace with the new one and return from the loop + // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker + // But this case will be handled in the worker_watcher::Get v.workers <- w return } @@ -78,7 +89,7 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { } */ - if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + if atomic.LoadUint64(&v.destroy) == 1 { return nil, errors.E(errors.WatcherStopped) } |