summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r--plugins/jobs/drivers/sqs/config.go103
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go190
-rw-r--r--plugins/jobs/drivers/sqs/item.go192
-rw-r--r--plugins/jobs/drivers/sqs/listener.go66
-rw-r--r--plugins/jobs/drivers/sqs/plugin.go39
5 files changed, 590 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
new file mode 100644
index 00000000..0b4e8157
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/config.go
@@ -0,0 +1,103 @@
+package sqs
+
+type GlobalCfg struct {
+ Key string `mapstructure:"key"`
+ Secret string `mapstructure:"secret"`
+ Region string `mapstructure:"region"`
+ SessionToken string `mapstructure:"session_token"`
+ Endpoint string `mapstructure:"endpoint"`
+}
+
+// Config is used to parse pipeline configuration
+type Config struct {
+ // The duration (in seconds) that the received messages are hidden from subsequent
+ // retrieve requests after being retrieved by a ReceiveMessage request.
+ VisibilityTimeout int32 `mapstructure:"visibility_timeout"`
+ // The duration (in seconds) for which the call waits for a message to arrive
+ // in the queue before returning. If a message is available, the call returns
+ // sooner than WaitTimeSeconds. If no messages are available and the wait time
+ // expires, the call returns successfully with an empty list of messages.
+ WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"`
+ // PrefetchCount is the maximum number of messages to return. Amazon SQS never returns more messages
+ // than this value (however, fewer messages might be returned). Valid values: 1 to
+ // 10. Default: 1.
+ PrefetchCount int32 `mapstructure:"pipeline_size"`
+ // The name of the new queue. The following limits apply to this name:
+ //
+ // * A queue
+ // name can have up to 80 characters.
+ //
+ // * Valid values: alphanumeric characters,
+ // hyphens (-), and underscores (_).
+ //
+ // * A FIFO queue name must end with the .fifo
+ // suffix.
+ //
+ // Queue URLs and names are case-sensitive.
+ //
+ // This member is required.
+ Queue string `mapstructure:"queue"`
+
+ // A map of attributes with their corresponding values. The following lists the
+ // names, descriptions, and values of the special request parameters that the
+ // CreateQueue action uses.
+ // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
+ Attributes map[string]string `mapstructure:"attributes"`
+
+ // From amazon docs:
+ // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see
+ // Tagging Your Amazon SQS Queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html)
+ // in the Amazon SQS Developer Guide. When you use queue tags, keep the following
+ // guidelines in mind:
+ //
+ // * Adding more than 50 tags to a queue isn't recommended.
+ //
+ // *
+ // Tags don't have any semantic meaning. Amazon SQS interprets tags as character
+ // strings.
+ //
+ // * Tags are case-sensitive.
+ //
+ // * A new tag with a key identical to that
+ // of an existing tag overwrites the existing tag.
+ //
+ // For a full list of tag
+ // restrictions, see Quotas related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues)
+ // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you
+ // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account
+ // permissions don't apply to this action. For more information, see Grant
+ // cross-account permissions to a role and a user name
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name)
+ // in the Amazon SQS Developer Guide.
+ Tags map[string]string `mapstructure:"tags"`
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Endpoint == "" {
+ c.Endpoint = "http://localhost:9324"
+ }
+}
+
+func (c *Config) InitDefault() {
+ if c.Queue == "" {
+ c.Queue = "default"
+ }
+
+ if c.PrefetchCount == 0 || c.PrefetchCount > 10 {
+ c.PrefetchCount = 10
+ }
+
+ if c.WaitTimeSeconds == 0 {
+ c.WaitTimeSeconds = 5
+ }
+
+ if c.Attributes == nil {
+ c.Attributes = make(map[string]string)
+ }
+
+ if c.Tags == nil {
+ c.Tags = make(map[string]string)
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
new file mode 100644
index 00000000..c0f66589
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -0,0 +1,190 @@
+package sqs
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/aws/retry"
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/credentials"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/google/uuid"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type JobConsumer struct {
+ sync.Mutex
+ pq priorityqueue.Queue
+ log logger.Logger
+ eh events.Handler
+ pipeline atomic.Value
+
+ // connection info
+ key string
+ secret string
+ sessionToken string
+ region string
+ endpoint string
+ queue string
+ messageGroupID string
+ waitTime int32
+ prefetch int32
+ visibilityTimeout int32
+
+ // queue optional parameters
+ attributes map[string]string
+ tags map[string]string
+
+ client *sqs.Client
+ outputQ *sqs.CreateQueueOutput
+
+ pauseCh chan struct{}
+}
+
+func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_sqs_consumer")
+
+ // if no such key - error
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // initialize job consumer
+ jb := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ messageGroupID: uuid.NewString(),
+ attributes: pipeCfg.Attributes,
+ tags: pipeCfg.Tags,
+ queue: pipeCfg.Queue,
+ prefetch: pipeCfg.PrefetchCount,
+ visibilityTimeout: pipeCfg.VisibilityTimeout,
+ waitTime: pipeCfg.WaitTimeSeconds,
+ region: globalCfg.Region,
+ key: globalCfg.Key,
+ sessionToken: globalCfg.SessionToken,
+ secret: globalCfg.Secret,
+ endpoint: globalCfg.Endpoint,
+ }
+
+ // PARSE CONFIGURATION -------
+
+ awsConf, err := config.LoadDefaultConfig(context.Background(),
+ config.WithRegion(globalCfg.Region),
+ config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // config with retries
+ jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
+ o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
+ opts.MaxAttempts = 60
+ })
+ })
+
+ jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return jb, nil
+}
+
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ return &JobConsumer{}, nil
+}
+
+func (j *JobConsumer) Push(jb *job.Job) error {
+ const op = errors.Op("sqs_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != jb.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
+ }
+
+ // The length of time, in seconds, for which to delay a specific message. Valid
+ // values: 0 to 900. Maximum: 15 minutes.
+ if jb.Options.Delay > 900 {
+ return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
+ }
+
+ msg := fromJob(jb)
+
+ // The new value for the message's visibility timeout (in seconds). Values range: 0
+ // to 43200. Maximum: 12 hours.
+ _, err := j.client.SendMessage(context.Background(), j.pack(msg))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error {
+ j.pipeline.Store(pipeline)
+ return nil
+}
+
+func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_consume")
+
+ j.Lock()
+ defer j.Unlock()
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+
+ // start listener
+ go j.listen()
+
+ return nil
+}
+
+func (j *JobConsumer) Stop() error {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Pause(pipeline string) {
+ panic("implement me")
+}
+
+func (j *JobConsumer) Resume(pipeline string) {
+ panic("implement me")
+}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
new file mode 100644
index 00000000..ef736be9
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -0,0 +1,192 @@
+package sqs
+
+import (
+ "strconv"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+const (
+ StringType string = "String"
+ NumberType string = "Number"
+ ApproximateReceiveCount string = "ApproximateReceiveCount"
+)
+
+var attributes = []string{
+ job.RRJob,
+ job.RRDelay,
+ job.RRTimeout,
+ job.RRPriority,
+ job.RRMaxAttempts,
+}
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int64 `json:"timeout,omitempty"`
+
+ // Maximum number of attempts to receive and process the message
+ MaxAttempts int64 `json:"max_attempts,omitempty"`
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt int64) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.MaxAttempts > (attempt + 1)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
+
+func (j *Item) ID() string {
+ return j.Ident
+}
+
+func (j *Item) Priority() int64 {
+ return j.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (j *Item) Body() []byte {
+ return utils.AsBytes(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the sqs, MessageAttributes used instead
+func (j *Item) Context() ([]byte, error) {
+ return nil, nil
+}
+
+func (j *Item) Ack() error {
+ return nil
+}
+
+func (j *Item) Nack() error {
+ return nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ MaxAttempts: job.Options.Attempts,
+ },
+ }
+}
+
+func (j *JobConsumer) pack(item *Item) *sqs.SendMessageInput {
+ return &sqs.SendMessageInput{
+ MessageBody: aws.String(item.Payload),
+ QueueUrl: j.outputQ.QueueUrl,
+ DelaySeconds: int32(item.Options.Delay),
+ MessageAttributes: map[string]types.MessageAttributeValue{
+ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(item.Job)},
+ job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Delay)))},
+ job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Timeout)))},
+ job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Priority)))},
+ job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.MaxAttempts)))},
+ },
+ }
+}
+
+func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) {
+ const op = errors.Op("sqs_unpack")
+ // reserved
+ if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
+ return nil, 0, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute"))
+ }
+
+ for i := 0; i < len(attributes); i++ {
+ if _, ok := msg.MessageAttributes[attributes[i]]; !ok {
+ return nil, 0, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i]))
+ }
+ }
+
+ attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ to, err := strconv.Atoi(*msg.MessageAttributes[job.RRTimeout].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount])
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ item := &Item{
+ Job: *msg.MessageAttributes[job.RRJob].StringValue,
+ Payload: *msg.Body,
+ Options: &Options{
+ Delay: int64(delay),
+ Timeout: int64(to),
+ Priority: int64(priority),
+ MaxAttempts: int64(attempt),
+ },
+ }
+
+ return item, recCount, nil
+}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
new file mode 100644
index 00000000..a10ce5a6
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -0,0 +1,66 @@
+package sqs
+
+import (
+ "context"
+
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+)
+
+const (
+ All string = "All"
+)
+
+func (j *JobConsumer) listen() {
+ for {
+ select {
+ case <-j.pauseCh:
+ return
+ default:
+ message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ MaxNumberOfMessages: j.prefetch,
+ AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
+ MessageAttributeNames: []string{All},
+ VisibilityTimeout: j.visibilityTimeout,
+ WaitTimeSeconds: j.waitTime,
+ })
+ if err != nil {
+ j.log.Error("receive message", "error", err)
+ continue
+ }
+
+ for i := 0; i < len(message.Messages); i++ {
+ m := message.Messages[i]
+ item, attempt, err := j.unpack(&m)
+ if err != nil {
+ _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if errD != nil {
+ j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ continue
+ }
+
+ j.log.Error("message unpack", "error", err)
+ continue
+ }
+
+ if item.Options.CanRetry(int64(attempt)) {
+ j.pq.Insert(item)
+ continue
+ }
+
+ _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if errD != nil {
+ j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ continue
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/jobs/drivers/sqs/plugin.go
new file mode 100644
index 00000000..54f61ff5
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/plugin.go
@@ -0,0 +1,39 @@
+package sqs
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pluginName string = "sqs"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewSQSConsumer(configKey, p.log, p.cfg, e, pq)
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipe, p.log, p.cfg, e, pq)
+}