diff options
author | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
commit | c90c11b92e229280477a9b049e65ca1048825dd4 (patch) | |
tree | 2a38695cad6dc3095b291575cfb40bc56820d86d /pkg/worker_watcher/container | |
parent | 1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff) |
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel.
Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1.
Replace third-party amqp091 with the official implementation.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher/container')
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 52 |
1 files changed, 49 insertions, 3 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index eafbfb07..c06d05b0 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -2,6 +2,7 @@ package channel import ( "context" + "sync" "sync/atomic" "github.com/spiral/errors" @@ -9,21 +10,62 @@ import ( ) type Vec struct { + sync.RWMutex + // destroy signal destroy uint64 + // channel with the workers workers chan worker.BaseProcess + + len uint64 } -func NewVector(initialNumOfWorkers uint64) *Vec { +func NewVector(len uint64) *Vec { vec := &Vec{ destroy: 0, - workers: make(chan worker.BaseProcess, initialNumOfWorkers), + len: len, + workers: make(chan worker.BaseProcess, len), } return vec } +// Push is O(1) operation +// In case of TTL and full channel O(n) worst case func (v *Vec) Push(w worker.BaseProcess) { - v.workers <- w + // Non-blocking channel send + select { + case v.workers <- w: + // default select branch is only possible when dealing with TTL + // because in that case, workers in the v.workers channel can be TTL-ed and killed + // but presenting in the channel + default: + v.Lock() + defer v.Unlock() + + /* + we can be in the default branch by the following reasons: + 1. TTL is set with no requests during the TTL + 2. Violated Get <-> Release operation (how ??) + */ + for i := uint64(0); i < v.len; i++ { + wrk := <-v.workers + switch wrk.State().Value() { + // skip good states + case worker.StateWorking, worker.StateReady: + // put the worker back + // generally, while send and receive operations are concurrent (from the channel), channel behave + // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO + v.workers <- wrk + continue + default: + // kill the current worker (just to be sure it's dead) + _ = wrk.Kill() + // replace with the new one + v.workers <- w + return + } + } + } } func (v *Vec) Remove(_ int64) {} @@ -40,6 +82,10 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { return nil, errors.E(errors.WatcherStopped) } + // used only for the TTL-ed workers + v.RLock() + defer v.RUnlock() + select { case w := <-v.workers: return w, nil |