summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go43
1 files changed, 41 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 4fb684f8..d957b3eb 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -2,6 +2,7 @@ package sqs
import (
"context"
+ "strconv"
"sync"
"sync/atomic"
"time"
@@ -11,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
@@ -263,7 +265,44 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
}
func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+ const op = errors.Op("sqs_state")
+ attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: j.queueURL,
+ AttributeNames: []types.QueueAttributeName{
+ types.QueueAttributeNameApproximateNumberOfMessages,
+ types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
+ types.QueueAttributeNameApproximateNumberOfMessagesNotVisible,
+ },
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: *j.queueURL,
+ }
+
+ nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
+ if err == nil {
+ out.Active = int64(nom)
+ }
+
+ delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)])
+ if err == nil {
+ out.Delayed = int64(delayed)
+ }
+
+ nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)])
+ if err == nil {
+ out.Reserved = int64(nv)
+ }
+
+ return out, nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -272,7 +311,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("rabbit_consume")
+ const op = errors.Op("sqs_run")
j.Lock()
defer j.Unlock()