summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/container/vec.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-26 14:08:33 +0300
committerValery Piashchynski <[email protected]>2021-06-26 14:08:33 +0300
commit53e50a05bd27ecec03695b69defd920fc4a25c5c (patch)
treee86ca391e5a85118098c6340a0f0ae86747db042 /pkg/worker_watcher/container/vec.go
parentad1ca84b26bb6a4ba410a8a684fe3d2e2f86eaea (diff)
parentfc540f6029772ff51913b8ee3c082f8197010c52 (diff)
Merge remote-tracking branch 'origin/master' into feature/jobs_plugin
Diffstat (limited to 'pkg/worker_watcher/container/vec.go')
-rw-r--r--pkg/worker_watcher/container/vec.go14
1 files changed, 11 insertions, 3 deletions
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
index 565b1b69..b9150c43 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/vec.go
@@ -1,8 +1,10 @@
package container
import (
+ "context"
"sync/atomic"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
@@ -24,18 +26,24 @@ func (v *Vec) Enqueue(w worker.BaseProcess) {
v.workers <- w
}
-func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
+func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) {
/*
if *addr == old {
*addr = new
return true
}
*/
+
if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
- return nil, true
+ return nil, errors.E(errors.WatcherStopped)
}
- return <-v.workers, false
+ select {
+ case w := <-v.workers:
+ return w, nil
+ case <-ctx.Done():
+ return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
+ }
}
func (v *Vec) Destroy() {