diff options
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r-- | plugins/jobs/drivers/sqs/config.go | 114 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 411 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 247 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 87 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/plugin.go | 39 |
5 files changed, 0 insertions, 898 deletions
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go deleted file mode 100644 index 9b2a1ca8..00000000 --- a/plugins/jobs/drivers/sqs/config.go +++ /dev/null @@ -1,114 +0,0 @@ -package sqs - -import "github.com/aws/aws-sdk-go-v2/aws" - -const ( - attributes string = "attributes" - tags string = "tags" - queue string = "queue" - pref string = "prefetch" - visibility string = "visibility_timeout" - waitTime string = "wait_time" -) - -type GlobalCfg struct { - Key string `mapstructure:"key"` - Secret string `mapstructure:"secret"` - Region string `mapstructure:"region"` - SessionToken string `mapstructure:"session_token"` - Endpoint string `mapstructure:"endpoint"` -} - -// Config is used to parse pipeline configuration -type Config struct { - // The duration (in seconds) that the received messages are hidden from subsequent - // retrieve requests after being retrieved by a ReceiveMessage request. - VisibilityTimeout int32 `mapstructure:"visibility_timeout"` - // The duration (in seconds) for which the call waits for a message to arrive - // in the queue before returning. If a message is available, the call returns - // sooner than WaitTimeSeconds. If no messages are available and the wait time - // expires, the call returns successfully with an empty list of messages. - WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"` - // Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages - // than this value (however, fewer messages might be returned). Valid values: 1 to - // 10. Default: 1. - Prefetch int32 `mapstructure:"prefetch"` - // The name of the new queue. The following limits apply to this name: - // - // * A queue - // name can have up to 80 characters. - // - // * Valid values: alphanumeric characters, - // hyphens (-), and underscores (_). - // - // * A FIFO queue name must end with the .fifo - // suffix. - // - // Queue URLs and names are case-sensitive. - // - // This member is required. - Queue *string `mapstructure:"queue"` - - // A map of attributes with their corresponding values. The following lists the - // names, descriptions, and values of the special request parameters that the - // CreateQueue action uses. - // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html - Attributes map[string]string `mapstructure:"attributes"` - - // From amazon docs: - // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see - // Tagging Your Amazon SQS Queues - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html) - // in the Amazon SQS Developer Guide. When you use queue tags, keep the following - // guidelines in mind: - // - // * Adding more than 50 tags to a queue isn't recommended. - // - // * - // Tags don't have any semantic meaning. Amazon SQS interprets tags as character - // strings. - // - // * Tags are case-sensitive. - // - // * A new tag with a key identical to that - // of an existing tag overwrites the existing tag. - // - // For a full list of tag - // restrictions, see Quotas related to queues - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues) - // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you - // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account - // permissions don't apply to this action. For more information, see Grant - // cross-account permissions to a role and a user name - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name) - // in the Amazon SQS Developer Guide. - Tags map[string]string `mapstructure:"tags"` -} - -func (c *GlobalCfg) InitDefault() { - if c.Endpoint == "" { - c.Endpoint = "http://127.0.0.1:9324" - } -} - -func (c *Config) InitDefault() { - if c.Queue == nil { - c.Queue = aws.String("default") - } - - if c.Prefetch == 0 || c.Prefetch > 10 { - c.Prefetch = 10 - } - - if c.WaitTimeSeconds == 0 { - c.WaitTimeSeconds = 5 - } - - if c.Attributes == nil { - c.Attributes = make(map[string]string) - } - - if c.Tags == nil { - c.Tags = make(map[string]string) - } -} diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go deleted file mode 100644 index 23203190..00000000 --- a/plugins/jobs/drivers/sqs/consumer.go +++ /dev/null @@ -1,411 +0,0 @@ -package sqs - -import ( - "context" - "strconv" - "sync" - "sync/atomic" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/aws/retry" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "github.com/google/uuid" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type consumer struct { - sync.Mutex - pq priorityqueue.Queue - log logger.Logger - eh events.Handler - pipeline atomic.Value - - // connection info - key string - secret string - sessionToken string - region string - endpoint string - queue *string - messageGroupID string - waitTime int32 - prefetch int32 - visibilityTimeout int32 - - // if user invoke several resume operations - listeners uint32 - - // queue optional parameters - attributes map[string]string - tags map[string]string - - client *sqs.Client - queueURL *string - - pauseCh chan struct{} -} - -func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_sqs_consumer") - - // if no such key - error - if !cfg.Has(configKey) { - return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) - } - - // if no global section - if !cfg.Has(pluginName) { - return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) - } - - // PARSE CONFIGURATION ------- - var pipeCfg Config - var globalCfg GlobalCfg - - err := cfg.UnmarshalKey(configKey, &pipeCfg) - if err != nil { - return nil, errors.E(op, err) - } - - pipeCfg.InitDefault() - - err = cfg.UnmarshalKey(pluginName, &globalCfg) - if err != nil { - return nil, errors.E(op, err) - } - - globalCfg.InitDefault() - - // initialize job consumer - jb := &consumer{ - pq: pq, - log: log, - eh: e, - messageGroupID: uuid.NewString(), - attributes: pipeCfg.Attributes, - tags: pipeCfg.Tags, - queue: pipeCfg.Queue, - prefetch: pipeCfg.Prefetch, - visibilityTimeout: pipeCfg.VisibilityTimeout, - waitTime: pipeCfg.WaitTimeSeconds, - region: globalCfg.Region, - key: globalCfg.Key, - sessionToken: globalCfg.SessionToken, - secret: globalCfg.Secret, - endpoint: globalCfg.Endpoint, - pauseCh: make(chan struct{}, 1), - } - - // PARSE CONFIGURATION ------- - - awsConf, err := config.LoadDefaultConfig(context.Background(), - config.WithRegion(globalCfg.Region), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken))) - if err != nil { - return nil, errors.E(op, err) - } - - // config with retries - jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) { - o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { - opts.MaxAttempts = 60 - }) - }) - - out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) - if err != nil { - return nil, errors.E(op, err) - } - - // assign a queue URL - jb.queueURL = out.QueueUrl - - // To successfully create a new queue, you must provide a - // queue name that adheres to the limits related to queues - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) - // and is unique within the scope of your queues. After you create a queue, you - // must wait at least one second after the queue is created to be able to use the <------------ - // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require - time.Sleep(time.Second * 2) - - return jb, nil -} - -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_sqs_consumer") - - // if no global section - if !cfg.Has(pluginName) { - return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) - } - - // PARSE CONFIGURATION ------- - var globalCfg GlobalCfg - - err := cfg.UnmarshalKey(pluginName, &globalCfg) - if err != nil { - return nil, errors.E(op, err) - } - - globalCfg.InitDefault() - - attr := make(map[string]string) - err = pipe.Map(attributes, attr) - if err != nil { - return nil, errors.E(op, err) - } - - tg := make(map[string]string) - err = pipe.Map(tags, tg) - if err != nil { - return nil, errors.E(op, err) - } - - // initialize job consumer - jb := &consumer{ - pq: pq, - log: log, - eh: e, - messageGroupID: uuid.NewString(), - attributes: attr, - tags: tg, - queue: aws.String(pipe.String(queue, "default")), - prefetch: int32(pipe.Int(pref, 10)), - visibilityTimeout: int32(pipe.Int(visibility, 0)), - waitTime: int32(pipe.Int(waitTime, 0)), - region: globalCfg.Region, - key: globalCfg.Key, - sessionToken: globalCfg.SessionToken, - secret: globalCfg.Secret, - endpoint: globalCfg.Endpoint, - pauseCh: make(chan struct{}, 1), - } - - // PARSE CONFIGURATION ------- - - awsConf, err := config.LoadDefaultConfig(context.Background(), - config.WithRegion(globalCfg.Region), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken))) - if err != nil { - return nil, errors.E(op, err) - } - - // config with retries - jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) { - o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { - opts.MaxAttempts = 60 - }) - }) - - out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) - if err != nil { - return nil, errors.E(op, err) - } - - // assign a queue URL - jb.queueURL = out.QueueUrl - - // To successfully create a new queue, you must provide a - // queue name that adheres to the limits related to queues - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) - // and is unique within the scope of your queues. After you create a queue, you - // must wait at least one second after the queue is created to be able to use the <------------ - // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require - time.Sleep(time.Second * 2) - - return jb, nil -} - -func (j *consumer) Push(ctx context.Context, jb *job.Job) error { - const op = errors.Op("sqs_push") - // check if the pipeline registered - - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != jb.Options.Pipeline { - return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) - } - - // The length of time, in seconds, for which to delay a specific message. Valid - // values: 0 to 900. Maximum: 15 minutes. - if jb.Options.Delay > 900 { - return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) - } - - err := j.handleItem(ctx, fromJob(jb)) - if err != nil { - return errors.E(op, err) - } - return nil -} - -func (j *consumer) State(ctx context.Context) (*jobState.State, error) { - const op = errors.Op("sqs_state") - attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ - QueueUrl: j.queueURL, - AttributeNames: []types.QueueAttributeName{ - types.QueueAttributeNameApproximateNumberOfMessages, - types.QueueAttributeNameApproximateNumberOfMessagesDelayed, - types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, - }, - }) - - if err != nil { - return nil, errors.E(op, err) - } - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - out := &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: *j.queueURL, - Ready: ready(atomic.LoadUint32(&j.listeners)), - } - - nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) - if err == nil { - out.Active = int64(nom) - } - - delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)]) - if err == nil { - out.Delayed = int64(delayed) - } - - nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)]) - if err == nil { - out.Reserved = int64(nv) - } - - return out, nil -} - -func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { - j.pipeline.Store(p) - return nil -} - -func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("sqs_run") - - j.Lock() - defer j.Unlock() - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p.Name() { - return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) - } - - atomic.AddUint32(&j.listeners, 1) - - // start listener - go j.listen(context.Background()) - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - - return nil -} - -func (j *consumer) Stop(context.Context) error { - j.pauseCh <- struct{}{} - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - -func (j *consumer) Pause(_ context.Context, p string) { - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) - return - } - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 0 { - j.log.Warn("no active listeners, nothing to pause") - return - } - - atomic.AddUint32(&j.listeners, ^uint32(0)) - - // stop consume - j.pauseCh <- struct{}{} - - j.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) -} - -func (j *consumer) Resume(_ context.Context, p string) { - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) - return - } - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 1 { - j.log.Warn("sqs listener already in the active state") - return - } - - // start listener - go j.listen(context.Background()) - - // increase num of listeners - atomic.AddUint32(&j.listeners, 1) - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) -} - -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { - d, err := msg.pack(j.queueURL) - if err != nil { - return err - } - _, err = j.client.SendMessage(ctx, d) - if err != nil { - return err - } - - return nil -} - -func ready(r uint32) bool { - return r > 0 -} diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go deleted file mode 100644 index 996adf6c..00000000 --- a/plugins/jobs/drivers/sqs/item.go +++ /dev/null @@ -1,247 +0,0 @@ -package sqs - -import ( - "context" - "strconv" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - json "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - StringType string = "String" - NumberType string = "Number" - BinaryType string = "Binary" - ApproximateReceiveCount string = "ApproximateReceiveCount" -) - -var itemAttributes = []string{ - job.RRJob, - job.RRDelay, - job.RRPriority, - job.RRHeaders, -} - -type Item struct { - // Job contains pluginName of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // Private ================ - approxReceiveCount int64 - queue *string - receiptHandler *string - client *sqs.Client - requeueFn func(context.Context, *Item) error -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -// Body packs job payload into binary payload. -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -// Context packs job context (job, id) into binary payload. -// Not used in the sqs, MessageAttributes used instead -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: i.Options.queue, - ReceiptHandle: i.Options.receiptHandler, - }) - - if err != nil { - return err - } - - return nil -} - -func (i *Item) Nack() error { - // requeue message - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: i.Options.queue, - ReceiptHandle: i.Options.receiptHandler, - }) - - if err != nil { - return err - } - - return nil -} - -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - // overwrite the delay - i.Options.Delay = delay - i.Headers = headers - - // requeue message - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - // Delete job from the queue only after successful requeue - _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: i.Options.queue, - ReceiptHandle: i.Options.receiptHandler, - }) - - if err != nil { - return err - } - - return nil -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Headers: job.Headers, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} - -func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { - // pack headers map - data, err := json.Marshal(i.Headers) - if err != nil { - return nil, err - } - - return &sqs.SendMessageInput{ - MessageBody: aws.String(i.Payload), - QueueUrl: queue, - DelaySeconds: int32(i.Options.Delay), - MessageAttributes: map[string]types.MessageAttributeValue{ - job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, - job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, - job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil}, - job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, - }, - }, nil -} - -func (j *consumer) unpack(msg *types.Message) (*Item, error) { - const op = errors.Op("sqs_unpack") - // reserved - if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { - return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute")) - } - - for i := 0; i < len(itemAttributes); i++ { - if _, ok := msg.MessageAttributes[itemAttributes[i]]; !ok { - return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", itemAttributes[i])) - } - } - - var h map[string][]string - err := json.Unmarshal(msg.MessageAttributes[job.RRHeaders].BinaryValue, &h) - if err != nil { - return nil, err - } - - delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue) - if err != nil { - return nil, errors.E(op, err) - } - - priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue) - if err != nil { - return nil, errors.E(op, err) - } - - recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount]) - if err != nil { - return nil, errors.E(op, err) - } - - item := &Item{ - Job: *msg.MessageAttributes[job.RRJob].StringValue, - Payload: *msg.Body, - Headers: h, - Options: &Options{ - Delay: int64(delay), - Priority: int64(priority), - - // private - approxReceiveCount: int64(recCount), - client: j.client, - queue: j.queueURL, - receiptHandler: msg.ReceiptHandle, - requeueFn: j.handleItem, - }, - } - - return item, nil -} diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go deleted file mode 100644 index a4280af2..00000000 --- a/plugins/jobs/drivers/sqs/listener.go +++ /dev/null @@ -1,87 +0,0 @@ -package sqs - -import ( - "context" - "time" - - "github.com/aws/aws-sdk-go-v2/aws/transport/http" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "github.com/aws/smithy-go" -) - -const ( - // All - get all message attribute names - All string = "All" - - // NonExistentQueue AWS error code - NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" -) - -func (j *consumer) listen(ctx context.Context) { //nolint:gocognit - for { - select { - case <-j.pauseCh: - j.log.Warn("sqs listener stopped") - return - default: - message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ - QueueUrl: j.queueURL, - MaxNumberOfMessages: j.prefetch, - AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)}, - MessageAttributeNames: []string{All}, - // The new value for the message's visibility timeout (in seconds). Values range: 0 - // to 43200. Maximum: 12 hours. - VisibilityTimeout: j.visibilityTimeout, - WaitTimeSeconds: j.waitTime, - }) - - if err != nil { - if oErr, ok := (err).(*smithy.OperationError); ok { - if rErr, ok := oErr.Err.(*http.ResponseError); ok { - if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok { - // in case of NonExistentQueue - recreate the queue - if apiErr.Code == NonExistentQueue { - j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) - _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags}) - if err != nil { - j.log.Error("create queue", "error", err) - } - // To successfully create a new queue, you must provide a - // queue name that adheres to the limits related to the queues - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) - // and is unique within the scope of your queues. After you create a queue, you - // must wait at least one second after the queue is created to be able to use the <------------ - // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require - time.Sleep(time.Second * 2) - continue - } - } - } - } - - j.log.Error("receive message", "error", err) - continue - } - - for i := 0; i < len(message.Messages); i++ { - m := message.Messages[i] - item, err := j.unpack(&m) - if err != nil { - _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.queueURL, - ReceiptHandle: m.ReceiptHandle, - }) - if errD != nil { - j.log.Error("message unpack, failed to delete the message from the queue", "error", err) - } - - j.log.Error("message unpack", "error", err) - continue - } - - j.pq.Insert(item) - } - } - } -} diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/jobs/drivers/sqs/plugin.go deleted file mode 100644 index 54f61ff5..00000000 --- a/plugins/jobs/drivers/sqs/plugin.go +++ /dev/null @@ -1,39 +0,0 @@ -package sqs - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - pluginName string = "sqs" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Available() {} - -func (p *Plugin) Name() string { - return pluginName -} - -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewSQSConsumer(configKey, p.log, p.cfg, e, pq) -} - -func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipe, p.log, p.cfg, e, pq) -} |