summaryrefslogtreecommitdiff
path: root/plugins/sqs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-30 21:32:50 +0300
committerValery Piashchynski <[email protected]>2021-08-30 21:32:50 +0300
commitc7d9385f135853539100430521042f7e7e2ae005 (patch)
tree588f45f6cfcd716bb3197ebff8cfdbc86a984afc /plugins/sqs
parentf6070d04558ce2e06a114ec2d9a8557d6f88d89b (diff)
Tests for the boltdb jobs.
Fix issue with Stop in the jobs plugin which didn't destroy the pool. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/sqs')
-rw-r--r--plugins/sqs/consumer.go84
-rw-r--r--plugins/sqs/item.go8
-rw-r--r--plugins/sqs/listener.go36
3 files changed, 65 insertions, 63 deletions
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go
index 23203190..dfbda154 100644
--- a/plugins/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
@@ -227,12 +227,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
return jb, nil
}
-func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
+func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != jb.Options.Pipeline {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
@@ -243,17 +243,17 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
}
- err := j.handleItem(ctx, fromJob(jb))
+ err := c.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
}
return nil
}
-func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("sqs_state")
- attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
- QueueUrl: j.queueURL,
+ attr, err := c.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: c.queueURL,
AttributeNames: []types.QueueAttributeName{
types.QueueAttributeNameApproximateNumberOfMessages,
types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
@@ -265,13 +265,13 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
return nil, errors.E(op, err)
}
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
out := &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
- Queue: *j.queueURL,
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Queue: *c.queueURL,
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}
nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
@@ -292,28 +292,28 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
+func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ c.pipeline.Store(p)
return nil
}
-func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("sqs_run")
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
// start listener
- go j.listen(context.Background())
+ go c.listen(context.Background())
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -323,11 +323,13 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *consumer) Stop(context.Context) error {
- j.pauseCh <- struct{}{}
+func (c *consumer) Stop(context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.pauseCh <- struct{}{}
+ }
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -336,27 +338,27 @@ func (j *consumer) Stop(context.Context) error {
return nil
}
-func (j *consumer) Pause(_ context.Context, p string) {
+func (c *consumer) Pause(_ context.Context, p string) {
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
return
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// stop consume
- j.pauseCh <- struct{}{}
+ c.pauseCh <- struct{}{}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -364,28 +366,28 @@ func (j *consumer) Pause(_ context.Context, p string) {
})
}
-func (j *consumer) Resume(_ context.Context, p string) {
+func (c *consumer) Resume(_ context.Context, p string) {
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
return
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("sqs listener already in the active state")
+ c.log.Warn("sqs listener already in the active state")
return
}
// start listener
- go j.listen(context.Background())
+ go c.listen(context.Background())
// increase num of listeners
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -393,12 +395,12 @@ func (j *consumer) Resume(_ context.Context, p string) {
})
}
-func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(c.queueURL)
if err != nil {
return err
}
- _, err = j.client.SendMessage(ctx, d)
+ _, err = c.client.SendMessage(ctx, d)
if err != nil {
return err
}
diff --git a/plugins/sqs/item.go b/plugins/sqs/item.go
index 996adf6c..4e33e99e 100644
--- a/plugins/sqs/item.go
+++ b/plugins/sqs/item.go
@@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
}, nil
}
-func (j *consumer) unpack(msg *types.Message) (*Item, error) {
+func (c *consumer) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
@@ -236,10 +236,10 @@ func (j *consumer) unpack(msg *types.Message) (*Item, error) {
// private
approxReceiveCount: int64(recCount),
- client: j.client,
- queue: j.queueURL,
+ client: c.client,
+ queue: c.queueURL,
receiptHandler: msg.ReceiptHandle,
- requeueFn: j.handleItem,
+ requeueFn: c.handleItem,
},
}
diff --git a/plugins/sqs/listener.go b/plugins/sqs/listener.go
index a4280af2..215dd6a5 100644
--- a/plugins/sqs/listener.go
+++ b/plugins/sqs/listener.go
@@ -18,22 +18,22 @@ const (
NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)
-func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
+func (c *consumer) listen(ctx context.Context) { //nolint:gocognit
for {
select {
- case <-j.pauseCh:
- j.log.Warn("sqs listener stopped")
+ case <-c.pauseCh:
+ c.log.Warn("sqs listener stopped")
return
default:
- message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
- QueueUrl: j.queueURL,
- MaxNumberOfMessages: j.prefetch,
+ message, err := c.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
+ QueueUrl: c.queueURL,
+ MaxNumberOfMessages: c.prefetch,
AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
MessageAttributeNames: []string{All},
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
- VisibilityTimeout: j.visibilityTimeout,
- WaitTimeSeconds: j.waitTime,
+ VisibilityTimeout: c.visibilityTimeout,
+ WaitTimeSeconds: c.waitTime,
})
if err != nil {
@@ -42,10 +42,10 @@ func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok {
// in case of NonExistentQueue - recreate the queue
if apiErr.Code == NonExistentQueue {
- j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
- _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags})
+ c.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
+ _, err = c.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: c.queue, Attributes: c.attributes, Tags: c.tags})
if err != nil {
- j.log.Error("create queue", "error", err)
+ c.log.Error("create queue", "error", err)
}
// To successfully create a new queue, you must provide a
// queue name that adheres to the limits related to the queues
@@ -60,27 +60,27 @@ func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
}
}
- j.log.Error("receive message", "error", err)
+ c.log.Error("receive message", "error", err)
continue
}
for i := 0; i < len(message.Messages); i++ {
m := message.Messages[i]
- item, err := j.unpack(&m)
+ item, err := c.unpack(&m)
if err != nil {
- _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.queueURL,
+ _, errD := c.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: c.queueURL,
ReceiptHandle: m.ReceiptHandle,
})
if errD != nil {
- j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ c.log.Error("message unpack, failed to delete the message from the queue", "error", err)
}
- j.log.Error("message unpack", "error", err)
+ c.log.Error("message unpack", "error", err)
continue
}
- j.pq.Insert(item)
+ c.pq.Insert(item)
}
}
}