diff options
author | Valery Piashchynski <[email protected]> | 2021-08-30 21:32:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-30 21:32:50 +0300 |
commit | c7d9385f135853539100430521042f7e7e2ae005 (patch) | |
tree | 588f45f6cfcd716bb3197ebff8cfdbc86a984afc /plugins/sqs | |
parent | f6070d04558ce2e06a114ec2d9a8557d6f88d89b (diff) |
Tests for the boltdb jobs.
Fix issue with Stop in the jobs plugin which didn't destroy the pool.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/sqs')
-rw-r--r-- | plugins/sqs/consumer.go | 84 | ||||
-rw-r--r-- | plugins/sqs/item.go | 8 | ||||
-rw-r--r-- | plugins/sqs/listener.go | 36 |
3 files changed, 65 insertions, 63 deletions
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go index 23203190..dfbda154 100644 --- a/plugins/sqs/consumer.go +++ b/plugins/sqs/consumer.go @@ -227,12 +227,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf return jb, nil } -func (j *consumer) Push(ctx context.Context, jb *job.Job) error { +func (c *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("sqs_push") // check if the pipeline registered // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != jb.Options.Pipeline { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) } @@ -243,17 +243,17 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error { return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) } - err := j.handleItem(ctx, fromJob(jb)) + err := c.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) } return nil } -func (j *consumer) State(ctx context.Context) (*jobState.State, error) { +func (c *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("sqs_state") - attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ - QueueUrl: j.queueURL, + attr, err := c.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: c.queueURL, AttributeNames: []types.QueueAttributeName{ types.QueueAttributeNameApproximateNumberOfMessages, types.QueueAttributeNameApproximateNumberOfMessagesDelayed, @@ -265,13 +265,13 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { return nil, errors.E(op, err) } - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) out := &jobState.State{ Pipeline: pipe.Name(), Driver: pipe.Driver(), - Queue: *j.queueURL, - Ready: ready(atomic.LoadUint32(&j.listeners)), + Queue: *c.queueURL, + Ready: ready(atomic.LoadUint32(&c.listeners)), } nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) @@ -292,28 +292,28 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { - j.pipeline.Store(p) +func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + c.pipeline.Store(p) return nil } -func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("sqs_run") - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) // start listener - go j.listen(context.Background()) + go c.listen(context.Background()) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -323,11 +323,13 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *consumer) Stop(context.Context) error { - j.pauseCh <- struct{}{} +func (c *consumer) Stop(context.Context) error { + if atomic.LoadUint32(&c.listeners) > 0 { + c.pauseCh <- struct{}{} + } - pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -336,27 +338,27 @@ func (j *consumer) Stop(context.Context) error { return nil } -func (j *consumer) Pause(_ context.Context, p string) { +func (c *consumer) Pause(_ context.Context, p string) { // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) return } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 0 { - j.log.Warn("no active listeners, nothing to pause") + c.log.Warn("no active listeners, nothing to pause") return } - atomic.AddUint32(&j.listeners, ^uint32(0)) + atomic.AddUint32(&c.listeners, ^uint32(0)) // stop consume - j.pauseCh <- struct{}{} + c.pauseCh <- struct{}{} - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -364,28 +366,28 @@ func (j *consumer) Pause(_ context.Context, p string) { }) } -func (j *consumer) Resume(_ context.Context, p string) { +func (c *consumer) Resume(_ context.Context, p string) { // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) return } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 1 { - j.log.Warn("sqs listener already in the active state") + c.log.Warn("sqs listener already in the active state") return } // start listener - go j.listen(context.Background()) + go c.listen(context.Background()) // increase num of listeners - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -393,12 +395,12 @@ func (j *consumer) Resume(_ context.Context, p string) { }) } -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { - d, err := msg.pack(j.queueURL) +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { + d, err := msg.pack(c.queueURL) if err != nil { return err } - _, err = j.client.SendMessage(ctx, d) + _, err = c.client.SendMessage(ctx, d) if err != nil { return err } diff --git a/plugins/sqs/item.go b/plugins/sqs/item.go index 996adf6c..4e33e99e 100644 --- a/plugins/sqs/item.go +++ b/plugins/sqs/item.go @@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { }, nil } -func (j *consumer) unpack(msg *types.Message) (*Item, error) { +func (c *consumer) unpack(msg *types.Message) (*Item, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { @@ -236,10 +236,10 @@ func (j *consumer) unpack(msg *types.Message) (*Item, error) { // private approxReceiveCount: int64(recCount), - client: j.client, - queue: j.queueURL, + client: c.client, + queue: c.queueURL, receiptHandler: msg.ReceiptHandle, - requeueFn: j.handleItem, + requeueFn: c.handleItem, }, } diff --git a/plugins/sqs/listener.go b/plugins/sqs/listener.go index a4280af2..215dd6a5 100644 --- a/plugins/sqs/listener.go +++ b/plugins/sqs/listener.go @@ -18,22 +18,22 @@ const ( NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" ) -func (j *consumer) listen(ctx context.Context) { //nolint:gocognit +func (c *consumer) listen(ctx context.Context) { //nolint:gocognit for { select { - case <-j.pauseCh: - j.log.Warn("sqs listener stopped") + case <-c.pauseCh: + c.log.Warn("sqs listener stopped") return default: - message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ - QueueUrl: j.queueURL, - MaxNumberOfMessages: j.prefetch, + message, err := c.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: c.queueURL, + MaxNumberOfMessages: c.prefetch, AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)}, MessageAttributeNames: []string{All}, // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. - VisibilityTimeout: j.visibilityTimeout, - WaitTimeSeconds: j.waitTime, + VisibilityTimeout: c.visibilityTimeout, + WaitTimeSeconds: c.waitTime, }) if err != nil { @@ -42,10 +42,10 @@ func (j *consumer) listen(ctx context.Context) { //nolint:gocognit if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok { // in case of NonExistentQueue - recreate the queue if apiErr.Code == NonExistentQueue { - j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) - _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags}) + c.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) + _, err = c.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: c.queue, Attributes: c.attributes, Tags: c.tags}) if err != nil { - j.log.Error("create queue", "error", err) + c.log.Error("create queue", "error", err) } // To successfully create a new queue, you must provide a // queue name that adheres to the limits related to the queues @@ -60,27 +60,27 @@ func (j *consumer) listen(ctx context.Context) { //nolint:gocognit } } - j.log.Error("receive message", "error", err) + c.log.Error("receive message", "error", err) continue } for i := 0; i < len(message.Messages); i++ { m := message.Messages[i] - item, err := j.unpack(&m) + item, err := c.unpack(&m) if err != nil { - _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.queueURL, + _, errD := c.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: c.queueURL, ReceiptHandle: m.ReceiptHandle, }) if errD != nil { - j.log.Error("message unpack, failed to delete the message from the queue", "error", err) + c.log.Error("message unpack, failed to delete the message from the queue", "error", err) } - j.log.Error("message unpack", "error", err) + c.log.Error("message unpack", "error", err) continue } - j.pq.Insert(item) + c.pq.Insert(item) } } } |