summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/amqp')
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/config.go (renamed from plugins/jobs/drivers/amqp/config.go)2
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/consumer.go (renamed from plugins/jobs/drivers/amqp/consumer.go)32
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/item.go (renamed from plugins/jobs/drivers/amqp/item.go)6
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/listener.go (renamed from plugins/jobs/drivers/amqp/listener.go)4
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go (renamed from plugins/jobs/drivers/amqp/rabbit_init.go)4
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/redial.go (renamed from plugins/jobs/drivers/amqp/redial.go)4
-rw-r--r--plugins/jobs/drivers/amqp/plugin.go5
7 files changed, 31 insertions, 26 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/amqpjobs/config.go
index 1ec089f1..ac2f6e53 100644
--- a/plugins/jobs/drivers/amqp/config.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/config.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
// pipeline rabbitmq info
const (
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go
index 95df02ec..1931ceaa 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"context"
@@ -20,7 +20,11 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-type JobConsumer struct {
+const (
+ pluginName string = "amqp"
+)
+
+type consumer struct {
sync.Mutex
log logger.Logger
pq priorityqueue.Queue
@@ -58,7 +62,7 @@ type JobConsumer struct {
}
// NewAMQPConsumer initializes rabbitmq pipeline
-func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_amqp_consumer")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -92,7 +96,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
globalCfg.InitDefault()
// PARSE CONFIGURATION END -------
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
pq: pq,
eh: e,
@@ -140,7 +144,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return jb, nil
}
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_amqp_consumer_from_pipeline")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -163,7 +167,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// PARSE CONFIGURATION -------
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
eh: e,
pq: pq,
@@ -214,7 +218,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
+func (j *consumer) Push(ctx context.Context, job *job.Job) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered
@@ -232,12 +236,12 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -287,7 +291,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("amqp_driver_state")
select {
case pch := <-j.publishChan:
@@ -316,7 +320,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
}
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested pause on: ", p)
@@ -354,7 +358,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested resume on: ", p)
@@ -413,7 +417,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *consumer) Stop(context.Context) error {
j.stopCh <- struct{}{}
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -427,7 +431,7 @@ func (j *JobConsumer) Stop(context.Context) error {
}
// handleItem
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
select {
case pch := <-j.publishChan:
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/amqpjobs/item.go
index 623dcca7..a8e305ea 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/item.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"context"
@@ -139,7 +139,7 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
-func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
+func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
item, err := j.unpack(d)
if err != nil {
@@ -194,7 +194,7 @@ func pack(id string, j *Item) (amqp.Table, error) {
}
// unpack restores jobs.Options
-func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
+func (j *consumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
multipleAsk: j.multipleAck,
requeue: j.requeueOnFail,
diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/amqpjobs/listener.go
index 0b1cd2dc..0156d55c 100644
--- a/plugins/jobs/drivers/amqp/listener.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/listener.go
@@ -1,8 +1,8 @@
-package amqp
+package amqpjobs
import amqp "github.com/rabbitmq/amqp091-go"
-func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) {
+func (j *consumer) listener(deliv <-chan amqp.Delivery) {
go func() {
for { //nolint:gosimple
select {
diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
index 56ef10c8..e260fabe 100644
--- a/plugins/jobs/drivers/amqp/rabbit_init.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
@@ -1,10 +1,10 @@
-package amqp
+package amqpjobs
import (
"github.com/spiral/errors"
)
-func (j *JobConsumer) initRabbitMQ() error {
+func (j *consumer) initRabbitMQ() error {
const op = errors.Op("jobs_plugin_rmq_init")
// Channel opens a unique, concurrent server channel to process the bulk of AMQP
// messages. Any error from methods on this receiver will render the receiver
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/amqpjobs/redial.go
index 8dc18b8f..0835e3ea 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/redial.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"time"
@@ -11,7 +11,7 @@ import (
)
// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (j *JobConsumer) redialer() { //nolint:gocognit
+func (j *consumer) redialer() { //nolint:gocognit
go func() {
const op = errors.Op("rabbitmq_redial")
diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go
index 624f4405..8797d20b 100644
--- a/plugins/jobs/drivers/amqp/plugin.go
+++ b/plugins/jobs/drivers/amqp/plugin.go
@@ -5,6 +5,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp/amqpjobs"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -31,10 +32,10 @@ func (p *Plugin) Name() string {
func (p *Plugin) Available() {}
func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
+ return amqpjobs.NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
}
// FromPipeline constructs AMQP driver from pipeline
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipe, p.log, p.cfg, e, pq)
+ return amqpjobs.FromPipeline(pipe, p.log, p.cfg, e, pq)
}