summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/container/channel
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-18 11:32:44 +0300
committerValery Piashchynski <[email protected]>2021-07-18 11:32:44 +0300
commit9c51360f9119a4114bdcc21c8e61f0908a3c876d (patch)
treeea63a051931bbd8282d64478bbefa2f970fcc955 /pkg/worker_watcher/container/channel
parentf4feb30197843d05eb308081ee579d3a9e3d6206 (diff)
Started beanstalk driver. Add new Queue impl (not finished yet).
Fix bugs in the AMQP, update proto-api Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher/container/channel')
-rw-r--r--pkg/worker_watcher/container/channel/vec.go53
1 files changed, 53 insertions, 0 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
new file mode 100644
index 00000000..eafbfb07
--- /dev/null
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -0,0 +1,53 @@
+package channel
+
+import (
+ "context"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Vec struct {
+ destroy uint64
+ workers chan worker.BaseProcess
+}
+
+func NewVector(initialNumOfWorkers uint64) *Vec {
+ vec := &Vec{
+ destroy: 0,
+ workers: make(chan worker.BaseProcess, initialNumOfWorkers),
+ }
+
+ return vec
+}
+
+func (v *Vec) Push(w worker.BaseProcess) {
+ v.workers <- w
+}
+
+func (v *Vec) Remove(_ int64) {}
+
+func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
+ /*
+ if *addr == old {
+ *addr = new
+ return true
+ }
+ */
+
+ if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ return nil, errors.E(errors.WatcherStopped)
+ }
+
+ select {
+ case w := <-v.workers:
+ return w, nil
+ case <-ctx.Done():
+ return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
+ }
+}
+
+func (v *Vec) Destroy() {
+ atomic.StoreUint64(&v.destroy, 1)
+}