diff options
Diffstat (limited to 'pkg/worker_watcher/container')
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 8 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 14 |
2 files changed, 17 insertions, 5 deletions
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go index 532bace9..e10ecdae 100644 --- a/pkg/worker_watcher/container/interface.go +++ b/pkg/worker_watcher/container/interface.go @@ -1,13 +1,17 @@ package container -import "github.com/spiral/roadrunner/v2/pkg/worker" +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) // Vector interface represents vector container type Vector interface { // Enqueue used to put worker to the vector Enqueue(worker.BaseProcess) // Dequeue used to get worker from the vector - Dequeue() (worker.BaseProcess, bool) + Dequeue(ctx context.Context) (worker.BaseProcess, error) // Destroy used to stop releasing the workers Destroy() } diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go index 565b1b69..b9150c43 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/vec.go @@ -1,8 +1,10 @@ package container import ( + "context" "sync/atomic" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -24,18 +26,24 @@ func (v *Vec) Enqueue(w worker.BaseProcess) { v.workers <- w } -func (v *Vec) Dequeue() (worker.BaseProcess, bool) { +func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) { /* if *addr == old { *addr = new return true } */ + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { - return nil, true + return nil, errors.E(errors.WatcherStopped) } - return <-v.workers, false + select { + case w := <-v.workers: + return w, nil + case <-ctx.Done(): + return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) + } } func (v *Vec) Destroy() { |