summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r--plugins/jobs/drivers/sqs/config.go114
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go411
-rw-r--r--plugins/jobs/drivers/sqs/item.go247
-rw-r--r--plugins/jobs/drivers/sqs/listener.go87
-rw-r--r--plugins/jobs/drivers/sqs/plugin.go39
5 files changed, 0 insertions, 898 deletions
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
deleted file mode 100644
index 9b2a1ca8..00000000
--- a/plugins/jobs/drivers/sqs/config.go
+++ /dev/null
@@ -1,114 +0,0 @@
-package sqs
-
-import "github.com/aws/aws-sdk-go-v2/aws"
-
-const (
- attributes string = "attributes"
- tags string = "tags"
- queue string = "queue"
- pref string = "prefetch"
- visibility string = "visibility_timeout"
- waitTime string = "wait_time"
-)
-
-type GlobalCfg struct {
- Key string `mapstructure:"key"`
- Secret string `mapstructure:"secret"`
- Region string `mapstructure:"region"`
- SessionToken string `mapstructure:"session_token"`
- Endpoint string `mapstructure:"endpoint"`
-}
-
-// Config is used to parse pipeline configuration
-type Config struct {
- // The duration (in seconds) that the received messages are hidden from subsequent
- // retrieve requests after being retrieved by a ReceiveMessage request.
- VisibilityTimeout int32 `mapstructure:"visibility_timeout"`
- // The duration (in seconds) for which the call waits for a message to arrive
- // in the queue before returning. If a message is available, the call returns
- // sooner than WaitTimeSeconds. If no messages are available and the wait time
- // expires, the call returns successfully with an empty list of messages.
- WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"`
- // Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages
- // than this value (however, fewer messages might be returned). Valid values: 1 to
- // 10. Default: 1.
- Prefetch int32 `mapstructure:"prefetch"`
- // The name of the new queue. The following limits apply to this name:
- //
- // * A queue
- // name can have up to 80 characters.
- //
- // * Valid values: alphanumeric characters,
- // hyphens (-), and underscores (_).
- //
- // * A FIFO queue name must end with the .fifo
- // suffix.
- //
- // Queue URLs and names are case-sensitive.
- //
- // This member is required.
- Queue *string `mapstructure:"queue"`
-
- // A map of attributes with their corresponding values. The following lists the
- // names, descriptions, and values of the special request parameters that the
- // CreateQueue action uses.
- // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
- Attributes map[string]string `mapstructure:"attributes"`
-
- // From amazon docs:
- // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see
- // Tagging Your Amazon SQS Queues
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html)
- // in the Amazon SQS Developer Guide. When you use queue tags, keep the following
- // guidelines in mind:
- //
- // * Adding more than 50 tags to a queue isn't recommended.
- //
- // *
- // Tags don't have any semantic meaning. Amazon SQS interprets tags as character
- // strings.
- //
- // * Tags are case-sensitive.
- //
- // * A new tag with a key identical to that
- // of an existing tag overwrites the existing tag.
- //
- // For a full list of tag
- // restrictions, see Quotas related to queues
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues)
- // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you
- // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account
- // permissions don't apply to this action. For more information, see Grant
- // cross-account permissions to a role and a user name
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name)
- // in the Amazon SQS Developer Guide.
- Tags map[string]string `mapstructure:"tags"`
-}
-
-func (c *GlobalCfg) InitDefault() {
- if c.Endpoint == "" {
- c.Endpoint = "http://127.0.0.1:9324"
- }
-}
-
-func (c *Config) InitDefault() {
- if c.Queue == nil {
- c.Queue = aws.String("default")
- }
-
- if c.Prefetch == 0 || c.Prefetch > 10 {
- c.Prefetch = 10
- }
-
- if c.WaitTimeSeconds == 0 {
- c.WaitTimeSeconds = 5
- }
-
- if c.Attributes == nil {
- c.Attributes = make(map[string]string)
- }
-
- if c.Tags == nil {
- c.Tags = make(map[string]string)
- }
-}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
deleted file mode 100644
index 23203190..00000000
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ /dev/null
@@ -1,411 +0,0 @@
-package sqs
-
-import (
- "context"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/aws/retry"
- "github.com/aws/aws-sdk-go-v2/config"
- "github.com/aws/aws-sdk-go-v2/credentials"
- "github.com/aws/aws-sdk-go-v2/service/sqs"
- "github.com/aws/aws-sdk-go-v2/service/sqs/types"
- "github.com/google/uuid"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
- cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type consumer struct {
- sync.Mutex
- pq priorityqueue.Queue
- log logger.Logger
- eh events.Handler
- pipeline atomic.Value
-
- // connection info
- key string
- secret string
- sessionToken string
- region string
- endpoint string
- queue *string
- messageGroupID string
- waitTime int32
- prefetch int32
- visibilityTimeout int32
-
- // if user invoke several resume operations
- listeners uint32
-
- // queue optional parameters
- attributes map[string]string
- tags map[string]string
-
- client *sqs.Client
- queueURL *string
-
- pauseCh chan struct{}
-}
-
-func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- const op = errors.Op("new_sqs_consumer")
-
- // if no such key - error
- if !cfg.Has(configKey) {
- return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
- }
-
- // if no global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
- }
-
- // PARSE CONFIGURATION -------
- var pipeCfg Config
- var globalCfg GlobalCfg
-
- err := cfg.UnmarshalKey(configKey, &pipeCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipeCfg.InitDefault()
-
- err = cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
-
- // initialize job consumer
- jb := &consumer{
- pq: pq,
- log: log,
- eh: e,
- messageGroupID: uuid.NewString(),
- attributes: pipeCfg.Attributes,
- tags: pipeCfg.Tags,
- queue: pipeCfg.Queue,
- prefetch: pipeCfg.Prefetch,
- visibilityTimeout: pipeCfg.VisibilityTimeout,
- waitTime: pipeCfg.WaitTimeSeconds,
- region: globalCfg.Region,
- key: globalCfg.Key,
- sessionToken: globalCfg.SessionToken,
- secret: globalCfg.Secret,
- endpoint: globalCfg.Endpoint,
- pauseCh: make(chan struct{}, 1),
- }
-
- // PARSE CONFIGURATION -------
-
- awsConf, err := config.LoadDefaultConfig(context.Background(),
- config.WithRegion(globalCfg.Region),
- config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // config with retries
- jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
- o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
- opts.MaxAttempts = 60
- })
- })
-
- out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // assign a queue URL
- jb.queueURL = out.QueueUrl
-
- // To successfully create a new queue, you must provide a
- // queue name that adheres to the limits related to queues
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
- // and is unique within the scope of your queues. After you create a queue, you
- // must wait at least one second after the queue is created to be able to use the <------------
- // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
- time.Sleep(time.Second * 2)
-
- return jb, nil
-}
-
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- const op = errors.Op("new_sqs_consumer")
-
- // if no global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
- }
-
- // PARSE CONFIGURATION -------
- var globalCfg GlobalCfg
-
- err := cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
-
- attr := make(map[string]string)
- err = pipe.Map(attributes, attr)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- tg := make(map[string]string)
- err = pipe.Map(tags, tg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // initialize job consumer
- jb := &consumer{
- pq: pq,
- log: log,
- eh: e,
- messageGroupID: uuid.NewString(),
- attributes: attr,
- tags: tg,
- queue: aws.String(pipe.String(queue, "default")),
- prefetch: int32(pipe.Int(pref, 10)),
- visibilityTimeout: int32(pipe.Int(visibility, 0)),
- waitTime: int32(pipe.Int(waitTime, 0)),
- region: globalCfg.Region,
- key: globalCfg.Key,
- sessionToken: globalCfg.SessionToken,
- secret: globalCfg.Secret,
- endpoint: globalCfg.Endpoint,
- pauseCh: make(chan struct{}, 1),
- }
-
- // PARSE CONFIGURATION -------
-
- awsConf, err := config.LoadDefaultConfig(context.Background(),
- config.WithRegion(globalCfg.Region),
- config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // config with retries
- jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
- o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
- opts.MaxAttempts = 60
- })
- })
-
- out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // assign a queue URL
- jb.queueURL = out.QueueUrl
-
- // To successfully create a new queue, you must provide a
- // queue name that adheres to the limits related to queues
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
- // and is unique within the scope of your queues. After you create a queue, you
- // must wait at least one second after the queue is created to be able to use the <------------
- // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
- time.Sleep(time.Second * 2)
-
- return jb, nil
-}
-
-func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
- const op = errors.Op("sqs_push")
- // check if the pipeline registered
-
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != jb.Options.Pipeline {
- return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
- }
-
- // The length of time, in seconds, for which to delay a specific message. Valid
- // values: 0 to 900. Maximum: 15 minutes.
- if jb.Options.Delay > 900 {
- return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
- }
-
- err := j.handleItem(ctx, fromJob(jb))
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
- const op = errors.Op("sqs_state")
- attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
- QueueUrl: j.queueURL,
- AttributeNames: []types.QueueAttributeName{
- types.QueueAttributeNameApproximateNumberOfMessages,
- types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
- types.QueueAttributeNameApproximateNumberOfMessagesNotVisible,
- },
- })
-
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- out := &jobState.State{
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Queue: *j.queueURL,
- Ready: ready(atomic.LoadUint32(&j.listeners)),
- }
-
- nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
- if err == nil {
- out.Active = int64(nom)
- }
-
- delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)])
- if err == nil {
- out.Delayed = int64(delayed)
- }
-
- nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)])
- if err == nil {
- out.Reserved = int64(nv)
- }
-
- return out, nil
-}
-
-func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
- return nil
-}
-
-func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("sqs_run")
-
- j.Lock()
- defer j.Unlock()
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p.Name() {
- return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
- }
-
- atomic.AddUint32(&j.listeners, 1)
-
- // start listener
- go j.listen(context.Background())
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-
- return nil
-}
-
-func (j *consumer) Stop(context.Context) error {
- j.pauseCh <- struct{}{}
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
- return nil
-}
-
-func (j *consumer) Pause(_ context.Context, p string) {
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
- return
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
- return
- }
-
- atomic.AddUint32(&j.listeners, ^uint32(0))
-
- // stop consume
- j.pauseCh <- struct{}{}
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipePaused,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-}
-
-func (j *consumer) Resume(_ context.Context, p string) {
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
- return
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 1 {
- j.log.Warn("sqs listener already in the active state")
- return
- }
-
- // start listener
- go j.listen(context.Background())
-
- // increase num of listeners
- atomic.AddUint32(&j.listeners, 1)
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: time.Now(),
- })
-}
-
-func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
- if err != nil {
- return err
- }
- _, err = j.client.SendMessage(ctx, d)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func ready(r uint32) bool {
- return r > 0
-}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
deleted file mode 100644
index 996adf6c..00000000
--- a/plugins/jobs/drivers/sqs/item.go
+++ /dev/null
@@ -1,247 +0,0 @@
-package sqs
-
-import (
- "context"
- "strconv"
- "time"
-
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/service/sqs"
- "github.com/aws/aws-sdk-go-v2/service/sqs/types"
- json "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-const (
- StringType string = "String"
- NumberType string = "Number"
- BinaryType string = "Binary"
- ApproximateReceiveCount string = "ApproximateReceiveCount"
-)
-
-var itemAttributes = []string{
- job.RRJob,
- job.RRDelay,
- job.RRPriority,
- job.RRHeaders,
-}
-
-type Item struct {
- // Job contains pluginName of job broker (usually PHP class).
- Job string `json:"job"`
-
- // Ident is unique identifier of the job, should be provided from outside
- Ident string `json:"id"`
-
- // Payload is string data (usually JSON) passed to Job broker.
- Payload string `json:"payload"`
-
- // Headers with key-values pairs
- Headers map[string][]string `json:"headers"`
-
- // Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options *Options `json:"options,omitempty"`
-}
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Priority is job priority, default - 10
- // pointer to distinguish 0 as a priority and nil as priority not set
- Priority int64 `json:"priority"`
-
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
-
- // Private ================
- approxReceiveCount int64
- queue *string
- receiptHandler *string
- client *sqs.Client
- requeueFn func(context.Context, *Item) error
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
-
-func (i *Item) ID() string {
- return i.Ident
-}
-
-func (i *Item) Priority() int64 {
- return i.Options.Priority
-}
-
-// Body packs job payload into binary payload.
-func (i *Item) Body() []byte {
- return utils.AsBytes(i.Payload)
-}
-
-// Context packs job context (job, id) into binary payload.
-// Not used in the sqs, MessageAttributes used instead
-func (i *Item) Context() ([]byte, error) {
- ctx, err := json.Marshal(
- struct {
- ID string `json:"id"`
- Job string `json:"job"`
- Headers map[string][]string `json:"headers"`
- Pipeline string `json:"pipeline"`
- }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
- )
-
- if err != nil {
- return nil, err
- }
-
- return ctx, nil
-}
-
-func (i *Item) Ack() error {
- _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: i.Options.queue,
- ReceiptHandle: i.Options.receiptHandler,
- })
-
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (i *Item) Nack() error {
- // requeue message
- err := i.Options.requeueFn(context.Background(), i)
- if err != nil {
- return err
- }
-
- _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: i.Options.queue,
- ReceiptHandle: i.Options.receiptHandler,
- })
-
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (i *Item) Requeue(headers map[string][]string, delay int64) error {
- // overwrite the delay
- i.Options.Delay = delay
- i.Headers = headers
-
- // requeue message
- err := i.Options.requeueFn(context.Background(), i)
- if err != nil {
- return err
- }
-
- // Delete job from the queue only after successful requeue
- _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: i.Options.queue,
- ReceiptHandle: i.Options.receiptHandler,
- })
-
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func fromJob(job *job.Job) *Item {
- return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Headers: job.Headers,
- Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- },
- }
-}
-
-func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
- // pack headers map
- data, err := json.Marshal(i.Headers)
- if err != nil {
- return nil, err
- }
-
- return &sqs.SendMessageInput{
- MessageBody: aws.String(i.Payload),
- QueueUrl: queue,
- DelaySeconds: int32(i.Options.Delay),
- MessageAttributes: map[string]types.MessageAttributeValue{
- job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
- job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
- job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil},
- job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
- },
- }, nil
-}
-
-func (j *consumer) unpack(msg *types.Message) (*Item, error) {
- const op = errors.Op("sqs_unpack")
- // reserved
- if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
- return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute"))
- }
-
- for i := 0; i < len(itemAttributes); i++ {
- if _, ok := msg.MessageAttributes[itemAttributes[i]]; !ok {
- return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", itemAttributes[i]))
- }
- }
-
- var h map[string][]string
- err := json.Unmarshal(msg.MessageAttributes[job.RRHeaders].BinaryValue, &h)
- if err != nil {
- return nil, err
- }
-
- delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount])
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- item := &Item{
- Job: *msg.MessageAttributes[job.RRJob].StringValue,
- Payload: *msg.Body,
- Headers: h,
- Options: &Options{
- Delay: int64(delay),
- Priority: int64(priority),
-
- // private
- approxReceiveCount: int64(recCount),
- client: j.client,
- queue: j.queueURL,
- receiptHandler: msg.ReceiptHandle,
- requeueFn: j.handleItem,
- },
- }
-
- return item, nil
-}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
deleted file mode 100644
index a4280af2..00000000
--- a/plugins/jobs/drivers/sqs/listener.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package sqs
-
-import (
- "context"
- "time"
-
- "github.com/aws/aws-sdk-go-v2/aws/transport/http"
- "github.com/aws/aws-sdk-go-v2/service/sqs"
- "github.com/aws/aws-sdk-go-v2/service/sqs/types"
- "github.com/aws/smithy-go"
-)
-
-const (
- // All - get all message attribute names
- All string = "All"
-
- // NonExistentQueue AWS error code
- NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
-)
-
-func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
- for {
- select {
- case <-j.pauseCh:
- j.log.Warn("sqs listener stopped")
- return
- default:
- message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
- QueueUrl: j.queueURL,
- MaxNumberOfMessages: j.prefetch,
- AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
- MessageAttributeNames: []string{All},
- // The new value for the message's visibility timeout (in seconds). Values range: 0
- // to 43200. Maximum: 12 hours.
- VisibilityTimeout: j.visibilityTimeout,
- WaitTimeSeconds: j.waitTime,
- })
-
- if err != nil {
- if oErr, ok := (err).(*smithy.OperationError); ok {
- if rErr, ok := oErr.Err.(*http.ResponseError); ok {
- if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok {
- // in case of NonExistentQueue - recreate the queue
- if apiErr.Code == NonExistentQueue {
- j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
- _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags})
- if err != nil {
- j.log.Error("create queue", "error", err)
- }
- // To successfully create a new queue, you must provide a
- // queue name that adheres to the limits related to the queues
- // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
- // and is unique within the scope of your queues. After you create a queue, you
- // must wait at least one second after the queue is created to be able to use the <------------
- // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
- time.Sleep(time.Second * 2)
- continue
- }
- }
- }
- }
-
- j.log.Error("receive message", "error", err)
- continue
- }
-
- for i := 0; i < len(message.Messages); i++ {
- m := message.Messages[i]
- item, err := j.unpack(&m)
- if err != nil {
- _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.queueURL,
- ReceiptHandle: m.ReceiptHandle,
- })
- if errD != nil {
- j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
- }
-
- j.log.Error("message unpack", "error", err)
- continue
- }
-
- j.pq.Insert(item)
- }
- }
- }
-}
diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/jobs/drivers/sqs/plugin.go
deleted file mode 100644
index 54f61ff5..00000000
--- a/plugins/jobs/drivers/sqs/plugin.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- pluginName string = "sqs"
-)
-
-type Plugin struct {
- log logger.Logger
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.log = log
- p.cfg = cfg
- return nil
-}
-
-func (p *Plugin) Available() {}
-
-func (p *Plugin) Name() string {
- return pluginName
-}
-
-func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewSQSConsumer(configKey, p.log, p.cfg, e, pq)
-}
-
-func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipe, p.log, p.cfg, e, pq)
-}