diff options
Diffstat (limited to 'plugins/jobs/drivers')
14 files changed, 75 insertions, 70 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/amqpjobs/config.go index 1ec089f1..ac2f6e53 100644 --- a/plugins/jobs/drivers/amqp/config.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/config.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs // pipeline rabbitmq info const ( diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go index 95df02ec..1931ceaa 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs import ( "context" @@ -20,7 +20,11 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -type JobConsumer struct { +const ( + pluginName string = "amqp" +) + +type consumer struct { sync.Mutex log logger.Logger pq priorityqueue.Queue @@ -58,7 +62,7 @@ type JobConsumer struct { } // NewAMQPConsumer initializes rabbitmq pipeline -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -92,7 +96,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, globalCfg.InitDefault() // PARSE CONFIGURATION END ------- - jb := &JobConsumer{ + jb := &consumer{ log: log, pq: pq, eh: e, @@ -140,7 +144,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_amqp_consumer_from_pipeline") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -163,7 +167,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // PARSE CONFIGURATION ------- - jb := &JobConsumer{ + jb := &consumer{ log: log, eh: e, pq: pq, @@ -214,7 +218,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { +func (j *consumer) Push(ctx context.Context, job *job.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered @@ -232,12 +236,12 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { j.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -287,7 +291,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("amqp_driver_state") select { case pch := <-j.publishChan: @@ -316,7 +320,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { } } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested pause on: ", p) @@ -354,7 +358,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested resume on: ", p) @@ -413,7 +417,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { }) } -func (j *JobConsumer) Stop(context.Context) error { +func (j *consumer) Stop(context.Context) error { j.stopCh <- struct{}{} pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -427,7 +431,7 @@ func (j *JobConsumer) Stop(context.Context) error { } // handleItem -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("rabbitmq_handle_item") select { case pch := <-j.publishChan: diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/amqpjobs/item.go index 623dcca7..a8e305ea 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/item.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs import ( "context" @@ -139,7 +139,7 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ -func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { +func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") item, err := j.unpack(d) if err != nil { @@ -194,7 +194,7 @@ func pack(id string, j *Item) (amqp.Table, error) { } // unpack restores jobs.Options -func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) { +func (j *consumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ multipleAsk: j.multipleAck, requeue: j.requeueOnFail, diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/amqpjobs/listener.go index 0b1cd2dc..0156d55c 100644 --- a/plugins/jobs/drivers/amqp/listener.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/listener.go @@ -1,8 +1,8 @@ -package amqp +package amqpjobs import amqp "github.com/rabbitmq/amqp091-go" -func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) { +func (j *consumer) listener(deliv <-chan amqp.Delivery) { go func() { for { //nolint:gosimple select { diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go index 56ef10c8..e260fabe 100644 --- a/plugins/jobs/drivers/amqp/rabbit_init.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go @@ -1,10 +1,10 @@ -package amqp +package amqpjobs import ( "github.com/spiral/errors" ) -func (j *JobConsumer) initRabbitMQ() error { +func (j *consumer) initRabbitMQ() error { const op = errors.Op("jobs_plugin_rmq_init") // Channel opens a unique, concurrent server channel to process the bulk of AMQP // messages. Any error from methods on this receiver will render the receiver diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/amqpjobs/redial.go index 8dc18b8f..0835e3ea 100644 --- a/plugins/jobs/drivers/amqp/redial.go +++ b/plugins/jobs/drivers/amqp/amqpjobs/redial.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs import ( "time" @@ -11,7 +11,7 @@ import ( ) // redialer used to redial to the rabbitmq in case of the connection interrupts -func (j *JobConsumer) redialer() { //nolint:gocognit +func (j *consumer) redialer() { //nolint:gocognit go func() { const op = errors.Op("rabbitmq_redial") diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go index 624f4405..8797d20b 100644 --- a/plugins/jobs/drivers/amqp/plugin.go +++ b/plugins/jobs/drivers/amqp/plugin.go @@ -5,6 +5,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp/amqpjobs" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -31,10 +32,10 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) + return amqpjobs.NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) } // FromPipeline constructs AMQP driver from pipeline func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipe, p.log, p.cfg, e, pq) + return amqpjobs.FromPipeline(pipe, p.log, p.cfg, e, pq) } diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 6323148b..5ef89983 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -19,7 +19,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -type JobConsumer struct { +type consumer struct { log logger.Logger eh events.Handler pq priorityqueue.Queue @@ -43,7 +43,7 @@ type JobConsumer struct { requeueCh chan *Item } -func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_beanstalk_consumer") // PARSE CONFIGURATION ------- @@ -86,7 +86,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config } // initialize job consumer - jc := &JobConsumer{ + jc := &consumer{ pq: pq, log: log, eh: e, @@ -108,7 +108,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config return jc, nil } -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_beanstalk_consumer") // PARSE CONFIGURATION ------- @@ -139,7 +139,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu } // initialize job consumer - jc := &JobConsumer{ + jc := &consumer{ pq: pq, log: log, eh: e, @@ -160,7 +160,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return jc, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("beanstalk_push") // check if the pipeline registered @@ -178,7 +178,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { +func (j *consumer) handleItem(ctx context.Context, item *Item) error { const op = errors.Op("beanstalk_handle_item") bb := new(bytes.Buffer) @@ -215,14 +215,14 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { return nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { // register the pipeline j.pipeline.Store(p) return nil } // State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("beanstalk_state") stat, err := j.pool.Stats(ctx) if err != nil { @@ -258,7 +258,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -282,7 +282,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { +func (j *consumer) Stop(context.Context) error { pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -299,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -326,7 +326,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index f1d7ac76..0a6cd560 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -134,7 +134,7 @@ func (i *Item) pack(b *bytes.Buffer) error { return nil } -func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error { +func (j *consumer) unpack(id uint64, data []byte, out *Item) error { err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) if err != nil { return err diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index f1385e70..6bb159ea 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -4,7 +4,7 @@ import ( "github.com/beanstalkd/go-beanstalk" ) -func (j *JobConsumer) listen() { +func (j *consumer) listen() { for { select { case <-j.stopCh: diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index f0992cd6..91b8eda9 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -25,7 +25,7 @@ type Config struct { Prefetch uint64 `mapstructure:"prefetch"` } -type JobConsumer struct { +type consumer struct { cfg *Config log logger.Logger eh events.Handler @@ -43,10 +43,10 @@ type JobConsumer struct { stopCh chan struct{} } -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_ephemeral_pipeline") - jb := &JobConsumer{ + jb := &consumer{ log: log, pq: pq, eh: eh, @@ -71,8 +71,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { - jb := &JobConsumer{ +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + jb := &consumer{ log: log, pq: pq, eh: eh, @@ -88,7 +88,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered @@ -105,7 +105,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { +func (j *consumer) State(_ context.Context) (*jobState.State, error) { pipe := j.pipeline.Load().(*pipeline.Pipeline) return &jobState.State{ Pipeline: pipe.Name(), @@ -117,12 +117,12 @@ func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { }, nil } -func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { j.pipeline.Store(pipeline) return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested pause on: ", p) @@ -149,7 +149,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested resume on: ", p) @@ -175,7 +175,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { } // Run is no-op for the ephemeral -func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), @@ -185,7 +185,7 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(ctx context.Context) error { +func (j *consumer) Stop(ctx context.Context) error { const op = errors.Op("ephemeral_plugin_stop") pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -207,7 +207,7 @@ func (j *JobConsumer) Stop(ctx context.Context) error { } } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("ephemeral_handle_request") // handle timeouts // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) @@ -245,7 +245,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { } } -func (j *JobConsumer) consume() { +func (j *consumer) consume() { go func() { // redirect for { diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 17af1caa..23203190 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -24,7 +24,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" ) -type JobConsumer struct { +type consumer struct { sync.Mutex pq priorityqueue.Queue log logger.Logger @@ -56,7 +56,7 @@ type JobConsumer struct { pauseCh chan struct{} } -func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_sqs_consumer") // if no such key - error @@ -88,7 +88,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure globalCfg.InitDefault() // initialize job consumer - jb := &JobConsumer{ + jb := &consumer{ pq: pq, log: log, eh: e, @@ -142,7 +142,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure return jb, nil } -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_sqs_consumer") // if no global section @@ -173,7 +173,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf } // initialize job consumer - jb := &JobConsumer{ + jb := &consumer{ pq: pq, log: log, eh: e, @@ -227,7 +227,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("sqs_push") // check if the pipeline registered @@ -250,7 +250,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("sqs_state") attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ QueueUrl: j.queueURL, @@ -292,12 +292,12 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { j.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("sqs_run") j.Lock() @@ -323,7 +323,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { +func (j *consumer) Stop(context.Context) error { j.pauseCh <- struct{}{} pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -336,7 +336,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -364,7 +364,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -393,7 +393,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { }) } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { d, err := msg.pack(j.queueURL) if err != nil { return err diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index df72b2e5..996adf6c 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { }, nil } -func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { +func (j *consumer) unpack(msg *types.Message) (*Item, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index 9efef90d..a4280af2 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -18,7 +18,7 @@ const ( NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" ) -func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit +func (j *consumer) listen(ctx context.Context) { //nolint:gocognit for { select { case <-j.pauseCh: |