summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/container
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/container')
-rw-r--r--pkg/worker_watcher/container/channel/vec.go52
1 files changed, 49 insertions, 3 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index eafbfb07..c06d05b0 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -2,6 +2,7 @@ package channel
import (
"context"
+ "sync"
"sync/atomic"
"github.com/spiral/errors"
@@ -9,21 +10,62 @@ import (
)
type Vec struct {
+ sync.RWMutex
+ // destroy signal
destroy uint64
+ // channel with the workers
workers chan worker.BaseProcess
+
+ len uint64
}
-func NewVector(initialNumOfWorkers uint64) *Vec {
+func NewVector(len uint64) *Vec {
vec := &Vec{
destroy: 0,
- workers: make(chan worker.BaseProcess, initialNumOfWorkers),
+ len: len,
+ workers: make(chan worker.BaseProcess, len),
}
return vec
}
+// Push is O(1) operation
+// In case of TTL and full channel O(n) worst case
func (v *Vec) Push(w worker.BaseProcess) {
- v.workers <- w
+ // Non-blocking channel send
+ select {
+ case v.workers <- w:
+ // default select branch is only possible when dealing with TTL
+ // because in that case, workers in the v.workers channel can be TTL-ed and killed
+ // but presenting in the channel
+ default:
+ v.Lock()
+ defer v.Unlock()
+
+ /*
+ we can be in the default branch by the following reasons:
+ 1. TTL is set with no requests during the TTL
+ 2. Violated Get <-> Release operation (how ??)
+ */
+ for i := uint64(0); i < v.len; i++ {
+ wrk := <-v.workers
+ switch wrk.State().Value() {
+ // skip good states
+ case worker.StateWorking, worker.StateReady:
+ // put the worker back
+ // generally, while send and receive operations are concurrent (from the channel), channel behave
+ // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
+ v.workers <- wrk
+ continue
+ default:
+ // kill the current worker (just to be sure it's dead)
+ _ = wrk.Kill()
+ // replace with the new one
+ v.workers <- w
+ return
+ }
+ }
+ }
}
func (v *Vec) Remove(_ int64) {}
@@ -40,6 +82,10 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
return nil, errors.E(errors.WatcherStopped)
}
+ // used only for the TTL-ed workers
+ v.RLock()
+ defer v.RUnlock()
+
select {
case w := <-v.workers:
return w, nil