summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/container/channel/vec.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/container/channel/vec.go')
-rw-r--r--pkg/worker_watcher/container/channel/vec.go53
1 files changed, 53 insertions, 0 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
new file mode 100644
index 00000000..eafbfb07
--- /dev/null
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -0,0 +1,53 @@
+package channel
+
+import (
+ "context"
+ "sync/atomic"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Vec struct {
+ destroy uint64
+ workers chan worker.BaseProcess
+}
+
+func NewVector(initialNumOfWorkers uint64) *Vec {
+ vec := &Vec{
+ destroy: 0,
+ workers: make(chan worker.BaseProcess, initialNumOfWorkers),
+ }
+
+ return vec
+}
+
+func (v *Vec) Push(w worker.BaseProcess) {
+ v.workers <- w
+}
+
+func (v *Vec) Remove(_ int64) {}
+
+func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
+ /*
+ if *addr == old {
+ *addr = new
+ return true
+ }
+ */
+
+ if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ return nil, errors.E(errors.WatcherStopped)
+ }
+
+ select {
+ case w := <-v.workers:
+ return w, nil
+ case <-ctx.Done():
+ return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
+ }
+}
+
+func (v *Vec) Destroy() {
+ atomic.StoreUint64(&v.destroy, 1)
+}