summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go65
1 files changed, 57 insertions, 8 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 9ce37543..5d419b51 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -2,6 +2,7 @@ package sqs
import (
"context"
+ "strconv"
"sync"
"sync/atomic"
"time"
@@ -11,10 +12,12 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -261,17 +264,46 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("sqs_state")
+ attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: j.queueURL,
+ AttributeNames: []types.QueueAttributeName{
+ types.QueueAttributeNameApproximateNumberOfMessages,
+ types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
+ types.QueueAttributeNameApproximateNumberOfMessagesNotVisible,
+ },
+ })
+
if err != nil {
- return err
+ return nil, errors.E(op, err)
}
- _, err = j.client.SendMessage(ctx, d)
- if err != nil {
- return err
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: *j.queueURL,
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
}
- return nil
+ nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
+ if err == nil {
+ out.Active = int64(nom)
+ }
+
+ delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)])
+ if err == nil {
+ out.Delayed = int64(delayed)
+ }
+
+ nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)])
+ if err == nil {
+ out.Reserved = int64(nv)
+ }
+
+ return out, nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -280,7 +312,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("rabbit_consume")
+ const op = errors.Op("sqs_run")
j.Lock()
defer j.Unlock()
@@ -374,3 +406,20 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
Start: time.Now(),
})
}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(j.queueURL)
+ if err != nil {
+ return err
+ }
+ _, err = j.client.SendMessage(ctx, d)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}