diff options
author | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
commit | c90c11b92e229280477a9b049e65ca1048825dd4 (patch) | |
tree | 2a38695cad6dc3095b291575cfb40bc56820d86d /plugins/jobs | |
parent | 1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff) |
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel.
Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1.
Replace third-party amqp091 with the official implementation.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 8 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/listener.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/redial.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 8 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 6 |
7 files changed, 27 insertions, 25 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 714a714a..3ca5c742 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" @@ -15,7 +16,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/streadway/amqp" ) type JobConsumer struct { @@ -325,7 +325,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { +func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -375,7 +375,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested pause on: ", p) @@ -413,7 +413,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) { }) } -func (j *JobConsumer) Resume(ctx context.Context, p string) { +func (j *JobConsumer) Resume(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested resume on: ", p) diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 295ccfd3..6b544620 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -4,10 +4,10 @@ import ( "time" json "github.com/json-iterator/go" + amqp "github.com/rabbitmq/amqp091-go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" - "github.com/streadway/amqp" ) type Item struct { diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go index 8011aa3b..0b1cd2dc 100644 --- a/plugins/jobs/drivers/amqp/listener.go +++ b/plugins/jobs/drivers/amqp/listener.go @@ -1,6 +1,6 @@ package amqp -import "github.com/streadway/amqp" +import amqp "github.com/rabbitmq/amqp091-go" func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) { go func() { diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go index fd19f1ce..ef2a130a 100644 --- a/plugins/jobs/drivers/amqp/redial.go +++ b/plugins/jobs/drivers/amqp/redial.go @@ -4,10 +4,10 @@ import ( "time" "github.com/cenkalti/backoff/v4" + amqp "github.com/rabbitmq/amqp091-go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/streadway/amqp" ) // redialer used to redial to the rabbitmq in case of the connection interrupts diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 797b4821..ae223f39 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -55,14 +55,16 @@ func NewConnPool(network, address, tName string, tout time.Duration, log logger. }, nil } -func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { +// Put the payload +// TODO use the context ?? +func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { cp.RLock() defer cp.RUnlock() id, err := cp.t.Put(body, pri, delay, ttr) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(ctx, err) + errN := cp.checkAndRedial(err) if errN != nil { return 0, errN } else { @@ -81,14 +83,14 @@ func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr // Typically, a client will reserve a job, perform some work, then delete // the job with Conn.Delete. -func (cp *ConnPool) Reserve(ctx context.Context, reserveTimeout time.Duration) (uint64, []byte, error) { +func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { cp.RLock() defer cp.RUnlock() id, body, err := cp.ts.Reserve(reserveTimeout) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(ctx, err) + errN := cp.checkAndRedial(err) if errN != nil { return 0, nil, errN } else { @@ -107,7 +109,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { err := cp.conn.Delete(id) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(ctx, err) + errN := cp.checkAndRedial(err) if errN != nil { return errN } else { @@ -118,12 +120,14 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { return nil } -func (cp *ConnPool) redial(ctx context.Context) error { +func (cp *ConnPool) redial() error { const op = errors.Op("connection_pool_redial") cp.Lock() // backoff here - expb := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) + expb := backoff.NewExponentialBackOff() + // TODO set via config + expb.MaxElapsedTime = time.Minute operation := func() error { connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) @@ -164,7 +168,7 @@ func (cp *ConnPool) redial(ctx context.Context) error { var connErrors = map[string]struct{}{"EOF": {}} -func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error { +func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") switch et := err.(type) { //nolint:gocritic // check if the error @@ -172,7 +176,7 @@ func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error { switch bErr := et.Err.(type) { case *net.OpError: cp.RUnlock() - errR := cp.redial(ctx) + errR := cp.redial() cp.RLock() // if redial failed - return if errR != nil { @@ -185,7 +189,7 @@ func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error { if _, ok := connErrors[et.Err.Error()]; ok { // if error is related to the broken connection - redial cp.RUnlock() - errR := cp.redial(ctx) + errR := cp.redial() cp.RLock() // if redial failed - return if errR != nil { diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 90da3801..54c8318b 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -192,7 +192,7 @@ func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error return nil } -func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { +func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -204,7 +204,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { atomic.AddUint32(&j.listeners, 1) - go j.listen(ctx) + go j.listen() j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, @@ -260,7 +260,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) { }) } -func (j *JobConsumer) Resume(ctx context.Context, p string) { +func (j *JobConsumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -276,7 +276,7 @@ func (j *JobConsumer) Resume(ctx context.Context, p string) { } // start listener - go j.listen(ctx) + go j.listen() // increase num of listeners atomic.AddUint32(&j.listeners, 1) diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index b872cbd4..aaf635b1 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -1,19 +1,17 @@ package beanstalk import ( - "context" - "github.com/beanstalkd/go-beanstalk" ) -func (j *JobConsumer) listen(ctx context.Context) { +func (j *JobConsumer) listen() { for { select { case <-j.stopCh: j.log.Warn("beanstalk listener stopped") return default: - id, body, err := j.pool.Reserve(ctx, j.reserveTimeout) + id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { if errB, ok := err.(beanstalk.ConnError); ok { switch errB.Err { //nolint:gocritic |