diff options
author | Valery Piashchynski <[email protected]> | 2021-06-26 00:10:45 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-26 00:10:45 +0300 |
commit | 7c0b63a9d0cee75e8bdedd2dcd0c20139f3ffd3f (patch) | |
tree | dbc09a665fb75d51f4b01a83cd80b7f2ddab0aae /pkg/worker_watcher/container | |
parent | ae58af3ca1d37cb61f106146e4bf9bd1d033e8b3 (diff) |
- Better channel handling
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher/container')
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 4 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 21 |
2 files changed, 14 insertions, 11 deletions
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go index 0e648d34..e10ecdae 100644 --- a/pkg/worker_watcher/container/interface.go +++ b/pkg/worker_watcher/container/interface.go @@ -1,6 +1,8 @@ package container import ( + "context" + "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -9,7 +11,7 @@ type Vector interface { // Enqueue used to put worker to the vector Enqueue(worker.BaseProcess) // Dequeue used to get worker from the vector - Dequeue() (worker.BaseProcess, bool) + Dequeue(ctx context.Context) (worker.BaseProcess, error) // Destroy used to stop releasing the workers Destroy() } diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go index 1ab9d073..8072af10 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/vec.go @@ -1,20 +1,20 @@ package container import ( + "context" "sync/atomic" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/worker" ) type Vec struct { - wqLen uint64 destroy uint64 workers chan worker.BaseProcess } func NewVector(initialNumOfWorkers uint64) Vector { vec := &Vec{ - wqLen: 0, destroy: 0, workers: make(chan worker.BaseProcess, initialNumOfWorkers), } @@ -23,11 +23,10 @@ func NewVector(initialNumOfWorkers uint64) Vector { } func (v *Vec) Enqueue(w worker.BaseProcess) { - atomic.AddUint64(&v.wqLen, 1) v.workers <- w } -func (v *Vec) Dequeue() (worker.BaseProcess, bool) { +func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) { /* if *addr == old { *addr = new @@ -36,15 +35,17 @@ func (v *Vec) Dequeue() (worker.BaseProcess, bool) { */ if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { - return nil, true + return nil, errors.E(errors.WatcherStopped) } - if num := atomic.LoadUint64(&v.wqLen); num > 0 { - atomic.AddUint64(&v.wqLen, ^uint64(0)) - return <-v.workers, false + for { + select { + case w := <-v.workers: + return w, nil + case <-ctx.Done(): + return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) + } } - - return nil, false } func (v *Vec) Destroy() { |