summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/container/vec.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/container/vec.go')
-rw-r--r--pkg/worker_watcher/container/vec.go11
1 files changed, 10 insertions, 1 deletions
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
index 565b1b69..1ab9d073 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/vec.go
@@ -7,12 +7,14 @@ import (
)
type Vec struct {
+ wqLen uint64
destroy uint64
workers chan worker.BaseProcess
}
func NewVector(initialNumOfWorkers uint64) Vector {
vec := &Vec{
+ wqLen: 0,
destroy: 0,
workers: make(chan worker.BaseProcess, initialNumOfWorkers),
}
@@ -21,6 +23,7 @@ func NewVector(initialNumOfWorkers uint64) Vector {
}
func (v *Vec) Enqueue(w worker.BaseProcess) {
+ atomic.AddUint64(&v.wqLen, 1)
v.workers <- w
}
@@ -31,11 +34,17 @@ func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
return true
}
*/
+
if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
return nil, true
}
- return <-v.workers, false
+ if num := atomic.LoadUint64(&v.wqLen); num > 0 {
+ atomic.AddUint64(&v.wqLen, ^uint64(0))
+ return <-v.workers, false
+ }
+
+ return nil, false
}
func (v *Vec) Destroy() {