summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
committerValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
commitc90c11b92e229280477a9b049e65ca1048825dd4 (patch)
tree2a38695cad6dc3095b291575cfb40bc56820d86d /plugins/jobs/drivers/amqp
parent1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff)
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel. Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1. Replace third-party amqp091 with the official implementation. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go8
-rw-r--r--plugins/jobs/drivers/amqp/item.go2
-rw-r--r--plugins/jobs/drivers/amqp/listener.go2
-rw-r--r--plugins/jobs/drivers/amqp/redial.go2
4 files changed, 7 insertions, 7 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 714a714a..3ca5c742 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/google/uuid"
+ amqp "github.com/rabbitmq/amqp091-go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -15,7 +16,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/streadway/amqp"
)
type JobConsumer struct {
@@ -325,7 +325,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -375,7 +375,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, p string) {
+func (j *JobConsumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested pause on: ", p)
@@ -413,7 +413,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(ctx context.Context, p string) {
+func (j *JobConsumer) Resume(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested resume on: ", p)
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 295ccfd3..6b544620 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -4,10 +4,10 @@ import (
"time"
json "github.com/json-iterator/go"
+ amqp "github.com/rabbitmq/amqp091-go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
- "github.com/streadway/amqp"
)
type Item struct {
diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go
index 8011aa3b..0b1cd2dc 100644
--- a/plugins/jobs/drivers/amqp/listener.go
+++ b/plugins/jobs/drivers/amqp/listener.go
@@ -1,6 +1,6 @@
package amqp
-import "github.com/streadway/amqp"
+import amqp "github.com/rabbitmq/amqp091-go"
func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) {
go func() {
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index fd19f1ce..ef2a130a 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -4,10 +4,10 @@ import (
"time"
"github.com/cenkalti/backoff/v4"
+ amqp "github.com/rabbitmq/amqp091-go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/streadway/amqp"
)
// redialer used to redial to the rabbitmq in case of the connection interrupts