summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
committerValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
commitc90c11b92e229280477a9b049e65ca1048825dd4 (patch)
tree2a38695cad6dc3095b291575cfb40bc56820d86d /pkg/worker_watcher
parent1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff)
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel. Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1. Replace third-party amqp091 with the official implementation. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r--pkg/worker_watcher/container/channel/vec.go52
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go8
2 files changed, 55 insertions, 5 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index eafbfb07..c06d05b0 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -2,6 +2,7 @@ package channel
import (
"context"
+ "sync"
"sync/atomic"
"github.com/spiral/errors"
@@ -9,21 +10,62 @@ import (
)
type Vec struct {
+ sync.RWMutex
+ // destroy signal
destroy uint64
+ // channel with the workers
workers chan worker.BaseProcess
+
+ len uint64
}
-func NewVector(initialNumOfWorkers uint64) *Vec {
+func NewVector(len uint64) *Vec {
vec := &Vec{
destroy: 0,
- workers: make(chan worker.BaseProcess, initialNumOfWorkers),
+ len: len,
+ workers: make(chan worker.BaseProcess, len),
}
return vec
}
+// Push is O(1) operation
+// In case of TTL and full channel O(n) worst case
func (v *Vec) Push(w worker.BaseProcess) {
- v.workers <- w
+ // Non-blocking channel send
+ select {
+ case v.workers <- w:
+ // default select branch is only possible when dealing with TTL
+ // because in that case, workers in the v.workers channel can be TTL-ed and killed
+ // but presenting in the channel
+ default:
+ v.Lock()
+ defer v.Unlock()
+
+ /*
+ we can be in the default branch by the following reasons:
+ 1. TTL is set with no requests during the TTL
+ 2. Violated Get <-> Release operation (how ??)
+ */
+ for i := uint64(0); i < v.len; i++ {
+ wrk := <-v.workers
+ switch wrk.State().Value() {
+ // skip good states
+ case worker.StateWorking, worker.StateReady:
+ // put the worker back
+ // generally, while send and receive operations are concurrent (from the channel), channel behave
+ // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
+ v.workers <- wrk
+ continue
+ default:
+ // kill the current worker (just to be sure it's dead)
+ _ = wrk.Kill()
+ // replace with the new one
+ v.workers <- w
+ return
+ }
+ }
+ }
}
func (v *Vec) Remove(_ int64) {}
@@ -40,6 +82,10 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
return nil, errors.E(errors.WatcherStopped)
}
+ // used only for the TTL-ed workers
+ v.RLock()
+ defer v.RUnlock()
+
select {
case w := <-v.workers:
return w, nil
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index ca026383..348be199 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -29,7 +29,7 @@ type Vector interface {
type workerWatcher struct {
sync.RWMutex
container Vector
- // used to control the Destroy stage (that all workers are in the container)
+ // used to control Destroy stage (that all workers are in the container)
numWorkers uint64
workers []worker.BaseProcess
@@ -235,14 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
+ // remove worker
+ ww.Remove(w)
+
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}
+ // set state as stopped
w.State().Set(worker.StateStopped)
- ww.Remove(w)
+
err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{