diff options
author | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
commit | c90c11b92e229280477a9b049e65ca1048825dd4 (patch) | |
tree | 2a38695cad6dc3095b291575cfb40bc56820d86d /pkg/worker_watcher | |
parent | 1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff) |
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel.
Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1.
Replace third-party amqp091 with the official implementation.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 52 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 8 |
2 files changed, 55 insertions, 5 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index eafbfb07..c06d05b0 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -2,6 +2,7 @@ package channel import ( "context" + "sync" "sync/atomic" "github.com/spiral/errors" @@ -9,21 +10,62 @@ import ( ) type Vec struct { + sync.RWMutex + // destroy signal destroy uint64 + // channel with the workers workers chan worker.BaseProcess + + len uint64 } -func NewVector(initialNumOfWorkers uint64) *Vec { +func NewVector(len uint64) *Vec { vec := &Vec{ destroy: 0, - workers: make(chan worker.BaseProcess, initialNumOfWorkers), + len: len, + workers: make(chan worker.BaseProcess, len), } return vec } +// Push is O(1) operation +// In case of TTL and full channel O(n) worst case func (v *Vec) Push(w worker.BaseProcess) { - v.workers <- w + // Non-blocking channel send + select { + case v.workers <- w: + // default select branch is only possible when dealing with TTL + // because in that case, workers in the v.workers channel can be TTL-ed and killed + // but presenting in the channel + default: + v.Lock() + defer v.Unlock() + + /* + we can be in the default branch by the following reasons: + 1. TTL is set with no requests during the TTL + 2. Violated Get <-> Release operation (how ??) + */ + for i := uint64(0); i < v.len; i++ { + wrk := <-v.workers + switch wrk.State().Value() { + // skip good states + case worker.StateWorking, worker.StateReady: + // put the worker back + // generally, while send and receive operations are concurrent (from the channel), channel behave + // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO + v.workers <- wrk + continue + default: + // kill the current worker (just to be sure it's dead) + _ = wrk.Kill() + // replace with the new one + v.workers <- w + return + } + } + } } func (v *Vec) Remove(_ int64) {} @@ -40,6 +82,10 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { return nil, errors.E(errors.WatcherStopped) } + // used only for the TTL-ed workers + v.RLock() + defer v.RUnlock() + select { case w := <-v.workers: return w, nil diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index ca026383..348be199 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -29,7 +29,7 @@ type Vector interface { type workerWatcher struct { sync.RWMutex container Vector - // used to control the Destroy stage (that all workers are in the container) + // used to control Destroy stage (that all workers are in the container) numWorkers uint64 workers []worker.BaseProcess @@ -235,14 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } + // remove worker + ww.Remove(w) + if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } + // set state as stopped w.State().Set(worker.StateStopped) - ww.Remove(w) + err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ |