summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
committerValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
commitc90c11b92e229280477a9b049e65ca1048825dd4 (patch)
tree2a38695cad6dc3095b291575cfb40bc56820d86d /plugins/jobs/drivers
parent1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff)
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel. Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1. Replace third-party amqp091 with the official implementation. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go8
-rw-r--r--plugins/jobs/drivers/amqp/item.go2
-rw-r--r--plugins/jobs/drivers/amqp/listener.go2
-rw-r--r--plugins/jobs/drivers/amqp/redial.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go24
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go8
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go6
7 files changed, 27 insertions, 25 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 714a714a..3ca5c742 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/google/uuid"
+ amqp "github.com/rabbitmq/amqp091-go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -15,7 +16,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/streadway/amqp"
)
type JobConsumer struct {
@@ -325,7 +325,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -375,7 +375,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, p string) {
+func (j *JobConsumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested pause on: ", p)
@@ -413,7 +413,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(ctx context.Context, p string) {
+func (j *JobConsumer) Resume(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested resume on: ", p)
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 295ccfd3..6b544620 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -4,10 +4,10 @@ import (
"time"
json "github.com/json-iterator/go"
+ amqp "github.com/rabbitmq/amqp091-go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
- "github.com/streadway/amqp"
)
type Item struct {
diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go
index 8011aa3b..0b1cd2dc 100644
--- a/plugins/jobs/drivers/amqp/listener.go
+++ b/plugins/jobs/drivers/amqp/listener.go
@@ -1,6 +1,6 @@
package amqp
-import "github.com/streadway/amqp"
+import amqp "github.com/rabbitmq/amqp091-go"
func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) {
go func() {
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index fd19f1ce..ef2a130a 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -4,10 +4,10 @@ import (
"time"
"github.com/cenkalti/backoff/v4"
+ amqp "github.com/rabbitmq/amqp091-go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/streadway/amqp"
)
// redialer used to redial to the rabbitmq in case of the connection interrupts
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 797b4821..ae223f39 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -55,14 +55,16 @@ func NewConnPool(network, address, tName string, tout time.Duration, log logger.
}, nil
}
-func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
+// Put the payload
+// TODO use the context ??
+func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
cp.RLock()
defer cp.RUnlock()
id, err := cp.t.Put(body, pri, delay, ttr)
if err != nil {
// errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(ctx, err)
+ errN := cp.checkAndRedial(err)
if errN != nil {
return 0, errN
} else {
@@ -81,14 +83,14 @@ func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr
// Typically, a client will reserve a job, perform some work, then delete
// the job with Conn.Delete.
-func (cp *ConnPool) Reserve(ctx context.Context, reserveTimeout time.Duration) (uint64, []byte, error) {
+func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
cp.RLock()
defer cp.RUnlock()
id, body, err := cp.ts.Reserve(reserveTimeout)
if err != nil {
// errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(ctx, err)
+ errN := cp.checkAndRedial(err)
if errN != nil {
return 0, nil, errN
} else {
@@ -107,7 +109,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
err := cp.conn.Delete(id)
if err != nil {
// errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(ctx, err)
+ errN := cp.checkAndRedial(err)
if errN != nil {
return errN
} else {
@@ -118,12 +120,14 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
return nil
}
-func (cp *ConnPool) redial(ctx context.Context) error {
+func (cp *ConnPool) redial() error {
const op = errors.Op("connection_pool_redial")
cp.Lock()
// backoff here
- expb := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
+ expb := backoff.NewExponentialBackOff()
+ // TODO set via config
+ expb.MaxElapsedTime = time.Minute
operation := func() error {
connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
@@ -164,7 +168,7 @@ func (cp *ConnPool) redial(ctx context.Context) error {
var connErrors = map[string]struct{}{"EOF": {}}
-func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error {
+func (cp *ConnPool) checkAndRedial(err error) error {
const op = errors.Op("connection_pool_check_redial")
switch et := err.(type) { //nolint:gocritic
// check if the error
@@ -172,7 +176,7 @@ func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error {
switch bErr := et.Err.(type) {
case *net.OpError:
cp.RUnlock()
- errR := cp.redial(ctx)
+ errR := cp.redial()
cp.RLock()
// if redial failed - return
if errR != nil {
@@ -185,7 +189,7 @@ func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error {
if _, ok := connErrors[et.Err.Error()]; ok {
// if error is related to the broken connection - redial
cp.RUnlock()
- errR := cp.redial(ctx)
+ errR := cp.redial()
cp.RLock()
// if redial failed - return
if errR != nil {
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 90da3801..54c8318b 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -192,7 +192,7 @@ func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error
return nil
}
-func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -204,7 +204,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
atomic.AddUint32(&j.listeners, 1)
- go j.listen(ctx)
+ go j.listen()
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
@@ -260,7 +260,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(ctx context.Context, p string) {
+func (j *JobConsumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -276,7 +276,7 @@ func (j *JobConsumer) Resume(ctx context.Context, p string) {
}
// start listener
- go j.listen(ctx)
+ go j.listen()
// increase num of listeners
atomic.AddUint32(&j.listeners, 1)
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index b872cbd4..aaf635b1 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -1,19 +1,17 @@
package beanstalk
import (
- "context"
-
"github.com/beanstalkd/go-beanstalk"
)
-func (j *JobConsumer) listen(ctx context.Context) {
+func (j *JobConsumer) listen() {
for {
select {
case <-j.stopCh:
j.log.Warn("beanstalk listener stopped")
return
default:
- id, body, err := j.pool.Reserve(ctx, j.reserveTimeout)
+ id, body, err := j.pool.Reserve(j.reserveTimeout)
if err != nil {
if errB, ok := err.(beanstalk.ConnError); ok {
switch errB.Err { //nolint:gocritic