diff options
author | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
commit | c90c11b92e229280477a9b049e65ca1048825dd4 (patch) | |
tree | 2a38695cad6dc3095b291575cfb40bc56820d86d | |
parent | 1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff) |
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel.
Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1.
Replace third-party amqp091 with the official implementation.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 3 | ||||
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 52 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 8 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 8 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/listener.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/redial.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 8 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 6 | ||||
-rw-r--r-- | tests/env/Dockerfile-beanstalkd.yaml | 8 | ||||
-rw-r--r-- | tests/env/docker-compose.yaml | 9 |
13 files changed, 91 insertions, 43 deletions
@@ -26,13 +26,13 @@ require ( github.com/json-iterator/go v1.1.11 github.com/klauspost/compress v1.13.0 github.com/prometheus/client_golang v1.10.0 + github.com/rabbitmq/amqp091-go v0.0.0-20210714180937-de74e8a7d0e0 // indirect github.com/shirou/gopsutil v3.21.3+incompatible github.com/spf13/viper v1.7.1 // SPIRAL ==== github.com/spiral/endure v1.0.2 github.com/spiral/errors v1.0.11 github.com/spiral/goridge/v3 v3.1.4 - github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 // =========== github.com/stretchr/testify v1.7.0 github.com/tklauser/go-sysconf v0.3.6 // indirect @@ -373,6 +373,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rabbitmq/amqp091-go v0.0.0-20210714180937-de74e8a7d0e0 h1:D5EMs8zL77qXFJ60vl7x5xRxtezkXsmr8mwypRk5Pe4= +github.com/rabbitmq/amqp091-go v0.0.0-20210714180937-de74e8a7d0e0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -416,7 +418,6 @@ github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdU github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40= github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= -github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index eafbfb07..c06d05b0 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -2,6 +2,7 @@ package channel import ( "context" + "sync" "sync/atomic" "github.com/spiral/errors" @@ -9,21 +10,62 @@ import ( ) type Vec struct { + sync.RWMutex + // destroy signal destroy uint64 + // channel with the workers workers chan worker.BaseProcess + + len uint64 } -func NewVector(initialNumOfWorkers uint64) *Vec { +func NewVector(len uint64) *Vec { vec := &Vec{ destroy: 0, - workers: make(chan worker.BaseProcess, initialNumOfWorkers), + len: len, + workers: make(chan worker.BaseProcess, len), } return vec } +// Push is O(1) operation +// In case of TTL and full channel O(n) worst case func (v *Vec) Push(w worker.BaseProcess) { - v.workers <- w + // Non-blocking channel send + select { + case v.workers <- w: + // default select branch is only possible when dealing with TTL + // because in that case, workers in the v.workers channel can be TTL-ed and killed + // but presenting in the channel + default: + v.Lock() + defer v.Unlock() + + /* + we can be in the default branch by the following reasons: + 1. TTL is set with no requests during the TTL + 2. Violated Get <-> Release operation (how ??) + */ + for i := uint64(0); i < v.len; i++ { + wrk := <-v.workers + switch wrk.State().Value() { + // skip good states + case worker.StateWorking, worker.StateReady: + // put the worker back + // generally, while send and receive operations are concurrent (from the channel), channel behave + // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO + v.workers <- wrk + continue + default: + // kill the current worker (just to be sure it's dead) + _ = wrk.Kill() + // replace with the new one + v.workers <- w + return + } + } + } } func (v *Vec) Remove(_ int64) {} @@ -40,6 +82,10 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { return nil, errors.E(errors.WatcherStopped) } + // used only for the TTL-ed workers + v.RLock() + defer v.RUnlock() + select { case w := <-v.workers: return w, nil diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index ca026383..348be199 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -29,7 +29,7 @@ type Vector interface { type workerWatcher struct { sync.RWMutex container Vector - // used to control the Destroy stage (that all workers are in the container) + // used to control Destroy stage (that all workers are in the container) numWorkers uint64 workers []worker.BaseProcess @@ -235,14 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } + // remove worker + ww.Remove(w) + if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } + // set state as stopped w.State().Set(worker.StateStopped) - ww.Remove(w) + err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 714a714a..3ca5c742 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" @@ -15,7 +16,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/streadway/amqp" ) type JobConsumer struct { @@ -325,7 +325,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { +func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -375,7 +375,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested pause on: ", p) @@ -413,7 +413,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) { }) } -func (j *JobConsumer) Resume(ctx context.Context, p string) { +func (j *JobConsumer) Resume(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested resume on: ", p) diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 295ccfd3..6b544620 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -4,10 +4,10 @@ import ( "time" json "github.com/json-iterator/go" + amqp "github.com/rabbitmq/amqp091-go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" - "github.com/streadway/amqp" ) type Item struct { diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go index 8011aa3b..0b1cd2dc 100644 --- a/plugins/jobs/drivers/amqp/listener.go +++ b/plugins/jobs/drivers/amqp/listener.go @@ -1,6 +1,6 @@ package amqp -import "github.com/streadway/amqp" +import amqp "github.com/rabbitmq/amqp091-go" func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) { go func() { diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go index fd19f1ce..ef2a130a 100644 --- a/plugins/jobs/drivers/amqp/redial.go +++ b/plugins/jobs/drivers/amqp/redial.go @@ -4,10 +4,10 @@ import ( "time" "github.com/cenkalti/backoff/v4" + amqp "github.com/rabbitmq/amqp091-go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/streadway/amqp" ) // redialer used to redial to the rabbitmq in case of the connection interrupts diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 797b4821..ae223f39 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -55,14 +55,16 @@ func NewConnPool(network, address, tName string, tout time.Duration, log logger. }, nil } -func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { +// Put the payload +// TODO use the context ?? +func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { cp.RLock() defer cp.RUnlock() id, err := cp.t.Put(body, pri, delay, ttr) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(ctx, err) + errN := cp.checkAndRedial(err) if errN != nil { return 0, errN } else { @@ -81,14 +83,14 @@ func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr // Typically, a client will reserve a job, perform some work, then delete // the job with Conn.Delete. -func (cp *ConnPool) Reserve(ctx context.Context, reserveTimeout time.Duration) (uint64, []byte, error) { +func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { cp.RLock() defer cp.RUnlock() id, body, err := cp.ts.Reserve(reserveTimeout) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(ctx, err) + errN := cp.checkAndRedial(err) if errN != nil { return 0, nil, errN } else { @@ -107,7 +109,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { err := cp.conn.Delete(id) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(ctx, err) + errN := cp.checkAndRedial(err) if errN != nil { return errN } else { @@ -118,12 +120,14 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { return nil } -func (cp *ConnPool) redial(ctx context.Context) error { +func (cp *ConnPool) redial() error { const op = errors.Op("connection_pool_redial") cp.Lock() // backoff here - expb := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) + expb := backoff.NewExponentialBackOff() + // TODO set via config + expb.MaxElapsedTime = time.Minute operation := func() error { connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) @@ -164,7 +168,7 @@ func (cp *ConnPool) redial(ctx context.Context) error { var connErrors = map[string]struct{}{"EOF": {}} -func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error { +func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") switch et := err.(type) { //nolint:gocritic // check if the error @@ -172,7 +176,7 @@ func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error { switch bErr := et.Err.(type) { case *net.OpError: cp.RUnlock() - errR := cp.redial(ctx) + errR := cp.redial() cp.RLock() // if redial failed - return if errR != nil { @@ -185,7 +189,7 @@ func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error { if _, ok := connErrors[et.Err.Error()]; ok { // if error is related to the broken connection - redial cp.RUnlock() - errR := cp.redial(ctx) + errR := cp.redial() cp.RLock() // if redial failed - return if errR != nil { diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 90da3801..54c8318b 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -192,7 +192,7 @@ func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error return nil } -func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { +func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -204,7 +204,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { atomic.AddUint32(&j.listeners, 1) - go j.listen(ctx) + go j.listen() j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, @@ -260,7 +260,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) { }) } -func (j *JobConsumer) Resume(ctx context.Context, p string) { +func (j *JobConsumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -276,7 +276,7 @@ func (j *JobConsumer) Resume(ctx context.Context, p string) { } // start listener - go j.listen(ctx) + go j.listen() // increase num of listeners atomic.AddUint32(&j.listeners, 1) diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index b872cbd4..aaf635b1 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -1,19 +1,17 @@ package beanstalk import ( - "context" - "github.com/beanstalkd/go-beanstalk" ) -func (j *JobConsumer) listen(ctx context.Context) { +func (j *JobConsumer) listen() { for { select { case <-j.stopCh: j.log.Warn("beanstalk listener stopped") return default: - id, body, err := j.pool.Reserve(ctx, j.reserveTimeout) + id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { if errB, ok := err.(beanstalk.ConnError); ok { switch errB.Err { //nolint:gocritic diff --git a/tests/env/Dockerfile-beanstalkd.yaml b/tests/env/Dockerfile-beanstalkd.yaml index 7b36f8d3..852385a1 100644 --- a/tests/env/Dockerfile-beanstalkd.yaml +++ b/tests/env/Dockerfile-beanstalkd.yaml @@ -1,13 +1,15 @@ -FROM ubuntu:latest +FROM archlinux:latest ARG DEBIAN_FRONTEND=noninteractive +RUN pacman-key --init -RUN apt-get update && apt-get install -y curl build-essential pkg-config +RUN Y | pacman -Syu --noconfirm +RUN Y | pacman -S --noconfirm curl base-devel pkgconf RUN curl -sL https://github.com/kr/beanstalkd/archive/v1.12.tar.gz | tar xvz -C /tmp WORKDIR /tmp/beanstalkd-1.12 -RUN make +RUN make -j12 RUN cp beanstalkd /usr/bin EXPOSE 11300 diff --git a/tests/env/docker-compose.yaml b/tests/env/docker-compose.yaml index 2573fe7a..dc91e7b6 100644 --- a/tests/env/docker-compose.yaml +++ b/tests/env/docker-compose.yaml @@ -1,4 +1,4 @@ -version: '3' +version: '3.8' services: memcached: @@ -16,9 +16,6 @@ services: toxicproxy: image: shopify/toxiproxy -# ports: -# - "8474:8474" -# - "5673:5673" network_mode: "host" beanstalk: @@ -37,10 +34,6 @@ services: rabbitmq: image: rabbitmq:3-management - environment: - RABBITMQ_DEFAULT_USER: guest - RABBITMQ_DEFAULT_PASS: guest - RABBITMQ_DEFAULT_VHOST: / ports: - "15672:15672" - "5672:5672" |