diff options
Diffstat (limited to 'plugins/sqs/consumer.go')
-rw-r--r-- | plugins/sqs/consumer.go | 84 |
1 files changed, 43 insertions, 41 deletions
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go index 23203190..dfbda154 100644 --- a/plugins/sqs/consumer.go +++ b/plugins/sqs/consumer.go @@ -227,12 +227,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf return jb, nil } -func (j *consumer) Push(ctx context.Context, jb *job.Job) error { +func (c *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("sqs_push") // check if the pipeline registered // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != jb.Options.Pipeline { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) } @@ -243,17 +243,17 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error { return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) } - err := j.handleItem(ctx, fromJob(jb)) + err := c.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) } return nil } -func (j *consumer) State(ctx context.Context) (*jobState.State, error) { +func (c *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("sqs_state") - attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ - QueueUrl: j.queueURL, + attr, err := c.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: c.queueURL, AttributeNames: []types.QueueAttributeName{ types.QueueAttributeNameApproximateNumberOfMessages, types.QueueAttributeNameApproximateNumberOfMessagesDelayed, @@ -265,13 +265,13 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { return nil, errors.E(op, err) } - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) out := &jobState.State{ Pipeline: pipe.Name(), Driver: pipe.Driver(), - Queue: *j.queueURL, - Ready: ready(atomic.LoadUint32(&j.listeners)), + Queue: *c.queueURL, + Ready: ready(atomic.LoadUint32(&c.listeners)), } nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) @@ -292,28 +292,28 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { - j.pipeline.Store(p) +func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + c.pipeline.Store(p) return nil } -func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("sqs_run") - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) // start listener - go j.listen(context.Background()) + go c.listen(context.Background()) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -323,11 +323,13 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *consumer) Stop(context.Context) error { - j.pauseCh <- struct{}{} +func (c *consumer) Stop(context.Context) error { + if atomic.LoadUint32(&c.listeners) > 0 { + c.pauseCh <- struct{}{} + } - pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -336,27 +338,27 @@ func (j *consumer) Stop(context.Context) error { return nil } -func (j *consumer) Pause(_ context.Context, p string) { +func (c *consumer) Pause(_ context.Context, p string) { // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) return } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 0 { - j.log.Warn("no active listeners, nothing to pause") + c.log.Warn("no active listeners, nothing to pause") return } - atomic.AddUint32(&j.listeners, ^uint32(0)) + atomic.AddUint32(&c.listeners, ^uint32(0)) // stop consume - j.pauseCh <- struct{}{} + c.pauseCh <- struct{}{} - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -364,28 +366,28 @@ func (j *consumer) Pause(_ context.Context, p string) { }) } -func (j *consumer) Resume(_ context.Context, p string) { +func (c *consumer) Resume(_ context.Context, p string) { // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) return } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 1 { - j.log.Warn("sqs listener already in the active state") + c.log.Warn("sqs listener already in the active state") return } // start listener - go j.listen(context.Background()) + go c.listen(context.Background()) // increase num of listeners - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -393,12 +395,12 @@ func (j *consumer) Resume(_ context.Context, p string) { }) } -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { - d, err := msg.pack(j.queueURL) +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { + d, err := msg.pack(c.queueURL) if err != nil { return err } - _, err = j.client.SendMessage(ctx, d) + _, err = c.client.SendMessage(ctx, d) if err != nil { return err } |