diff options
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 65 |
1 files changed, 57 insertions, 8 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 9ce37543..5d419b51 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -2,6 +2,7 @@ package sqs import ( "context" + "strconv" "sync" "sync/atomic" "time" @@ -11,10 +12,12 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -261,17 +264,46 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - d, err := msg.pack(j.queueURL) +func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("sqs_state") + attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: j.queueURL, + AttributeNames: []types.QueueAttributeName{ + types.QueueAttributeNameApproximateNumberOfMessages, + types.QueueAttributeNameApproximateNumberOfMessagesDelayed, + types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, + }, + }) + if err != nil { - return err + return nil, errors.E(op, err) } - _, err = j.client.SendMessage(ctx, d) - if err != nil { - return err + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: *j.queueURL, + Ready: ready(atomic.LoadUint32(&j.listeners)), } - return nil + nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) + if err == nil { + out.Active = int64(nom) + } + + delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)]) + if err == nil { + out.Delayed = int64(delayed) + } + + nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)]) + if err == nil { + out.Reserved = int64(nv) + } + + return out, nil } func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { @@ -280,7 +312,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("rabbit_consume") + const op = errors.Op("sqs_run") j.Lock() defer j.Unlock() @@ -374,3 +406,20 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { Start: time.Now(), }) } + +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + d, err := msg.pack(j.queueURL) + if err != nil { + return err + } + _, err = j.client.SendMessage(ctx, d) + if err != nil { + return err + } + + return nil +} + +func ready(r uint32) bool { + return r > 0 +} |