summaryrefslogtreecommitdiff
path: root/plugins/sqs/listener.go
blob: 215dd6a57c93bd1c0e1eacc0c3d5481987e6f30d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package sqs

import (
	"context"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws/transport/http"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
	"github.com/aws/smithy-go"
)

const (
	// All - get all message attribute names
	All string = "All"

	// NonExistentQueue AWS error code
	NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)

func (c *consumer) listen(ctx context.Context) { //nolint:gocognit
	for {
		select {
		case <-c.pauseCh:
			c.log.Warn("sqs listener stopped")
			return
		default:
			message, err := c.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
				QueueUrl:              c.queueURL,
				MaxNumberOfMessages:   c.prefetch,
				AttributeNames:        []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
				MessageAttributeNames: []string{All},
				// The new value for the message's visibility timeout (in seconds). Values range: 0
				// to 43200. Maximum: 12 hours.
				VisibilityTimeout: c.visibilityTimeout,
				WaitTimeSeconds:   c.waitTime,
			})

			if err != nil {
				if oErr, ok := (err).(*smithy.OperationError); ok {
					if rErr, ok := oErr.Err.(*http.ResponseError); ok {
						if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok {
							// in case of NonExistentQueue - recreate the queue
							if apiErr.Code == NonExistentQueue {
								c.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
								_, err = c.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: c.queue, Attributes: c.attributes, Tags: c.tags})
								if err != nil {
									c.log.Error("create queue", "error", err)
								}
								// To successfully create a new queue, you must provide a
								// queue name that adheres to the limits related to the queues
								// (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
								// and is unique within the scope of your queues. After you create a queue, you
								// must wait at least one second after the queue is created to be able to use the <------------
								// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
								time.Sleep(time.Second * 2)
								continue
							}
						}
					}
				}

				c.log.Error("receive message", "error", err)
				continue
			}

			for i := 0; i < len(message.Messages); i++ {
				m := message.Messages[i]
				item, err := c.unpack(&m)
				if err != nil {
					_, errD := c.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
						QueueUrl:      c.queueURL,
						ReceiptHandle: m.ReceiptHandle,
					})
					if errD != nil {
						c.log.Error("message unpack, failed to delete the message from the queue", "error", err)
					}

					c.log.Error("message unpack", "error", err)
					continue
				}

				c.pq.Insert(item)
			}
		}
	}
}