summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/container/vec.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/container/vec.go')
-rw-r--r--pkg/worker_watcher/container/vec.go21
1 files changed, 11 insertions, 10 deletions
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
index 1ab9d073..8072af10 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/vec.go
@@ -1,20 +1,20 @@
package container
import (
+ "context"
"sync/atomic"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
type Vec struct {
- wqLen uint64
destroy uint64
workers chan worker.BaseProcess
}
func NewVector(initialNumOfWorkers uint64) Vector {
vec := &Vec{
- wqLen: 0,
destroy: 0,
workers: make(chan worker.BaseProcess, initialNumOfWorkers),
}
@@ -23,11 +23,10 @@ func NewVector(initialNumOfWorkers uint64) Vector {
}
func (v *Vec) Enqueue(w worker.BaseProcess) {
- atomic.AddUint64(&v.wqLen, 1)
v.workers <- w
}
-func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
+func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) {
/*
if *addr == old {
*addr = new
@@ -36,15 +35,17 @@ func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
*/
if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
- return nil, true
+ return nil, errors.E(errors.WatcherStopped)
}
- if num := atomic.LoadUint64(&v.wqLen); num > 0 {
- atomic.AddUint64(&v.wqLen, ^uint64(0))
- return <-v.workers, false
+ for {
+ select {
+ case w := <-v.workers:
+ return w, nil
+ case <-ctx.Done():
+ return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
+ }
}
-
- return nil, false
}
func (v *Vec) Destroy() {