diff options
Diffstat (limited to 'pkg/worker_watcher/container/vec.go')
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 11 |
1 files changed, 10 insertions, 1 deletions
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go index 565b1b69..1ab9d073 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/vec.go @@ -7,12 +7,14 @@ import ( ) type Vec struct { + wqLen uint64 destroy uint64 workers chan worker.BaseProcess } func NewVector(initialNumOfWorkers uint64) Vector { vec := &Vec{ + wqLen: 0, destroy: 0, workers: make(chan worker.BaseProcess, initialNumOfWorkers), } @@ -21,6 +23,7 @@ func NewVector(initialNumOfWorkers uint64) Vector { } func (v *Vec) Enqueue(w worker.BaseProcess) { + atomic.AddUint64(&v.wqLen, 1) v.workers <- w } @@ -31,11 +34,17 @@ func (v *Vec) Dequeue() (worker.BaseProcess, bool) { return true } */ + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { return nil, true } - return <-v.workers, false + if num := atomic.LoadUint64(&v.wqLen); num > 0 { + atomic.AddUint64(&v.wqLen, ^uint64(0)) + return <-v.workers, false + } + + return nil, false } func (v *Vec) Destroy() { |