summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
committerValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
commit3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch)
tree8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins/jobs/drivers
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/config.go (renamed from plugins/jobs/drivers/amqp/config.go)2
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/consumer.go (renamed from plugins/jobs/drivers/amqp/consumer.go)32
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/item.go (renamed from plugins/jobs/drivers/amqp/item.go)6
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/listener.go (renamed from plugins/jobs/drivers/amqp/listener.go)4
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go (renamed from plugins/jobs/drivers/amqp/rabbit_init.go)4
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/redial.go (renamed from plugins/jobs/drivers/amqp/redial.go)4
-rw-r--r--plugins/jobs/drivers/amqp/plugin.go5
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go26
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go2
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go28
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go26
-rw-r--r--plugins/jobs/drivers/sqs/item.go2
-rw-r--r--plugins/jobs/drivers/sqs/listener.go2
14 files changed, 75 insertions, 70 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/amqpjobs/config.go
index 1ec089f1..ac2f6e53 100644
--- a/plugins/jobs/drivers/amqp/config.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/config.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
// pipeline rabbitmq info
const (
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go
index 95df02ec..1931ceaa 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"context"
@@ -20,7 +20,11 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-type JobConsumer struct {
+const (
+ pluginName string = "amqp"
+)
+
+type consumer struct {
sync.Mutex
log logger.Logger
pq priorityqueue.Queue
@@ -58,7 +62,7 @@ type JobConsumer struct {
}
// NewAMQPConsumer initializes rabbitmq pipeline
-func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_amqp_consumer")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -92,7 +96,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
globalCfg.InitDefault()
// PARSE CONFIGURATION END -------
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
pq: pq,
eh: e,
@@ -140,7 +144,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return jb, nil
}
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_amqp_consumer_from_pipeline")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -163,7 +167,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// PARSE CONFIGURATION -------
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
eh: e,
pq: pq,
@@ -214,7 +218,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
+func (j *consumer) Push(ctx context.Context, job *job.Job) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered
@@ -232,12 +236,12 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -287,7 +291,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("amqp_driver_state")
select {
case pch := <-j.publishChan:
@@ -316,7 +320,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
}
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested pause on: ", p)
@@ -354,7 +358,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested resume on: ", p)
@@ -413,7 +417,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *consumer) Stop(context.Context) error {
j.stopCh <- struct{}{}
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -427,7 +431,7 @@ func (j *JobConsumer) Stop(context.Context) error {
}
// handleItem
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
select {
case pch := <-j.publishChan:
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/amqpjobs/item.go
index 623dcca7..a8e305ea 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/item.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"context"
@@ -139,7 +139,7 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
-func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
+func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
item, err := j.unpack(d)
if err != nil {
@@ -194,7 +194,7 @@ func pack(id string, j *Item) (amqp.Table, error) {
}
// unpack restores jobs.Options
-func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
+func (j *consumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
multipleAsk: j.multipleAck,
requeue: j.requeueOnFail,
diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/amqpjobs/listener.go
index 0b1cd2dc..0156d55c 100644
--- a/plugins/jobs/drivers/amqp/listener.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/listener.go
@@ -1,8 +1,8 @@
-package amqp
+package amqpjobs
import amqp "github.com/rabbitmq/amqp091-go"
-func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) {
+func (j *consumer) listener(deliv <-chan amqp.Delivery) {
go func() {
for { //nolint:gosimple
select {
diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
index 56ef10c8..e260fabe 100644
--- a/plugins/jobs/drivers/amqp/rabbit_init.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
@@ -1,10 +1,10 @@
-package amqp
+package amqpjobs
import (
"github.com/spiral/errors"
)
-func (j *JobConsumer) initRabbitMQ() error {
+func (j *consumer) initRabbitMQ() error {
const op = errors.Op("jobs_plugin_rmq_init")
// Channel opens a unique, concurrent server channel to process the bulk of AMQP
// messages. Any error from methods on this receiver will render the receiver
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/amqpjobs/redial.go
index 8dc18b8f..0835e3ea 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/amqpjobs/redial.go
@@ -1,4 +1,4 @@
-package amqp
+package amqpjobs
import (
"time"
@@ -11,7 +11,7 @@ import (
)
// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (j *JobConsumer) redialer() { //nolint:gocognit
+func (j *consumer) redialer() { //nolint:gocognit
go func() {
const op = errors.Op("rabbitmq_redial")
diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go
index 624f4405..8797d20b 100644
--- a/plugins/jobs/drivers/amqp/plugin.go
+++ b/plugins/jobs/drivers/amqp/plugin.go
@@ -5,6 +5,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp/amqpjobs"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -31,10 +32,10 @@ func (p *Plugin) Name() string {
func (p *Plugin) Available() {}
func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
+ return amqpjobs.NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
}
// FromPipeline constructs AMQP driver from pipeline
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipe, p.log, p.cfg, e, pq)
+ return amqpjobs.FromPipeline(pipe, p.log, p.cfg, e, pq)
}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 6323148b..5ef89983 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -19,7 +19,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-type JobConsumer struct {
+type consumer struct {
log logger.Logger
eh events.Handler
pq priorityqueue.Queue
@@ -43,7 +43,7 @@ type JobConsumer struct {
requeueCh chan *Item
}
-func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_beanstalk_consumer")
// PARSE CONFIGURATION -------
@@ -86,7 +86,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
}
// initialize job consumer
- jc := &JobConsumer{
+ jc := &consumer{
pq: pq,
log: log,
eh: e,
@@ -108,7 +108,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
return jc, nil
}
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_beanstalk_consumer")
// PARSE CONFIGURATION -------
@@ -139,7 +139,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
}
// initialize job consumer
- jc := &JobConsumer{
+ jc := &consumer{
pq: pq,
log: log,
eh: e,
@@ -160,7 +160,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return jc, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("beanstalk_push")
// check if the pipeline registered
@@ -178,7 +178,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
+func (j *consumer) handleItem(ctx context.Context, item *Item) error {
const op = errors.Op("beanstalk_handle_item")
bb := new(bytes.Buffer)
@@ -215,14 +215,14 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
return nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
// register the pipeline
j.pipeline.Store(p)
return nil
}
// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("beanstalk_state")
stat, err := j.pool.Stats(ctx)
if err != nil {
@@ -258,7 +258,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -282,7 +282,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *consumer) Stop(context.Context) error {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if atomic.LoadUint32(&j.listeners) == 1 {
@@ -299,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -326,7 +326,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index f1d7ac76..0a6cd560 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -134,7 +134,7 @@ func (i *Item) pack(b *bytes.Buffer) error {
return nil
}
-func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
+func (j *consumer) unpack(id uint64, data []byte, out *Item) error {
err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
if err != nil {
return err
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index f1385e70..6bb159ea 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -4,7 +4,7 @@ import (
"github.com/beanstalkd/go-beanstalk"
)
-func (j *JobConsumer) listen() {
+func (j *consumer) listen() {
for {
select {
case <-j.stopCh:
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index f0992cd6..91b8eda9 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -25,7 +25,7 @@ type Config struct {
Prefetch uint64 `mapstructure:"prefetch"`
}
-type JobConsumer struct {
+type consumer struct {
cfg *Config
log logger.Logger
eh events.Handler
@@ -43,10 +43,10 @@ type JobConsumer struct {
stopCh chan struct{}
}
-func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_ephemeral_pipeline")
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
pq: pq,
eh: eh,
@@ -71,8 +71,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
return jb, nil
}
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- jb := &JobConsumer{
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ jb := &consumer{
log: log,
pq: pq,
eh: eh,
@@ -88,7 +88,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
@@ -105,7 +105,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
+func (j *consumer) State(_ context.Context) (*jobState.State, error) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
return &jobState.State{
Pipeline: pipe.Name(),
@@ -117,12 +117,12 @@ func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
}, nil
}
-func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
j.pipeline.Store(pipeline)
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested pause on: ", p)
@@ -149,7 +149,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested resume on: ", p)
@@ -175,7 +175,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
}
// Run is no-op for the ephemeral
-func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
@@ -185,7 +185,7 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(ctx context.Context) error {
+func (j *consumer) Stop(ctx context.Context) error {
const op = errors.Op("ephemeral_plugin_stop")
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -207,7 +207,7 @@ func (j *JobConsumer) Stop(ctx context.Context) error {
}
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("ephemeral_handle_request")
// handle timeouts
// theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
@@ -245,7 +245,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
}
}
-func (j *JobConsumer) consume() {
+func (j *consumer) consume() {
go func() {
// redirect
for {
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 17af1caa..23203190 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -24,7 +24,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-type JobConsumer struct {
+type consumer struct {
sync.Mutex
pq priorityqueue.Queue
log logger.Logger
@@ -56,7 +56,7 @@ type JobConsumer struct {
pauseCh chan struct{}
}
-func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_sqs_consumer")
// if no such key - error
@@ -88,7 +88,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
globalCfg.InitDefault()
// initialize job consumer
- jb := &JobConsumer{
+ jb := &consumer{
pq: pq,
log: log,
eh: e,
@@ -142,7 +142,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
return jb, nil
}
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_sqs_consumer")
// if no global section
@@ -173,7 +173,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
}
// initialize job consumer
- jb := &JobConsumer{
+ jb := &consumer{
pq: pq,
log: log,
eh: e,
@@ -227,7 +227,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered
@@ -250,7 +250,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("sqs_state")
attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
QueueUrl: j.queueURL,
@@ -292,12 +292,12 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("sqs_run")
j.Lock()
@@ -323,7 +323,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *consumer) Stop(context.Context) error {
j.pauseCh <- struct{}{}
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -336,7 +336,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -364,7 +364,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -393,7 +393,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
})
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
d, err := msg.pack(j.queueURL)
if err != nil {
return err
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index df72b2e5..996adf6c 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
}, nil
}
-func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
+func (j *consumer) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index 9efef90d..a4280af2 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -18,7 +18,7 @@ const (
NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)
-func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
+func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
for {
select {
case <-j.pauseCh: