diff options
Diffstat (limited to 'plugins/jobs/broker/sqs/queue.go')
-rw-r--r-- | plugins/jobs/broker/sqs/queue.go | 266 |
1 files changed, 266 insertions, 0 deletions
diff --git a/plugins/jobs/broker/sqs/queue.go b/plugins/jobs/broker/sqs/queue.go new file mode 100644 index 00000000..8a92448e --- /dev/null +++ b/plugins/jobs/broker/sqs/queue.go @@ -0,0 +1,266 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/jobs/v2" + "strconv" + "sync" + "sync/atomic" + "time" +) + +type queue struct { + active int32 + pipe *jobs.Pipeline + url *string + reserve time.Duration + lockReserved time.Duration + + // queue events + lsn func(event int, ctx interface{}) + + // stop channel + wait chan interface{} + + // active operations + muw sync.RWMutex + wg sync.WaitGroup + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { + if pipe.String("queue", "") == "" { + return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name()) + } + + return &queue{ + pipe: pipe, + reserve: pipe.Duration("reserve", time.Second), + lockReserved: pipe.Duration("lockReserved", 300*time.Second), + lsn: lsn, + }, nil +} + +// declareQueue declared queue +func (q *queue) declareQueue(s *sqs.SQS) (*string, error) { + attr := make(map[string]*string) + for k, v := range q.pipe.Map("declare") { + if vs, ok := v.(string); ok { + attr[k] = aws.String(vs) + } + + if vi, ok := v.(int); ok { + attr[k] = aws.String(strconv.Itoa(vi)) + } + + if vb, ok := v.(bool); ok { + if vb { + attr[k] = aws.String("true") + } else { + attr[k] = aws.String("false") + } + } + } + + if len(attr) != 0 { + r, err := s.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(q.pipe.String("queue", "")), + Attributes: attr, + }) + + return r.QueueUrl, err + } + + // no need to create (get existed) + r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))}) + if err != nil { + return nil, err + } + + return r.QueueUrl, nil +} + +// serve consumers +func (q *queue) serve(s *sqs.SQS, tout time.Duration) { + q.wait = make(chan interface{}) + atomic.StoreInt32(&q.active, 1) + + var errored bool + for { + messages, stop, err := q.consume(s) + if err != nil { + if errored { + // reoccurring error + time.Sleep(tout) + } else { + errored = true + q.report(err) + } + + continue + } + errored = false + + if stop { + return + } + + for _, msg := range messages { + h := <-q.execPool + go func(h jobs.Handler, msg *sqs.Message) { + err := q.do(s, h, msg) + q.execPool <- h + q.wg.Done() + q.report(err) + }(h, msg) + } + } +} + +// consume and allocate connection. +func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) { + q.muw.Lock() + defer q.muw.Unlock() + + select { + case <-q.wait: + return nil, true, nil + default: + r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: q.url, + MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))), + WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())), + VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())), + AttributeNames: []*string{aws.String("ApproximateReceiveCount")}, + MessageAttributeNames: jobAttributes, + }) + if err != nil { + return nil, false, err + } + + q.wg.Add(len(r.Messages)) + + return r.Messages, false, nil + } +} + +// do single message +func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) { + id, attempt, j, err := unpack(msg) + if err != nil { + go q.deleteMessage(s, msg, err) + return err + } + + // block the job based on known timeout + _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + QueueUrl: q.url, + ReceiptHandle: msg.ReceiptHandle, + VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())), + }) + if err != nil { + go q.deleteMessage(s, msg, err) + return err + } + + err = h(id, j) + if err == nil { + return q.deleteMessage(s, msg, nil) + } + + q.errHandler(id, j, err) + + if !j.Options.CanRetry(attempt) { + return q.deleteMessage(s, msg, err) + } + + // retry after specified duration + _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + QueueUrl: q.url, + ReceiptHandle: msg.ReceiptHandle, + VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)), + }) + + return err +} + +func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error { + _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle}) + return drr +} + +// stop the queue consuming +func (q *queue) stop() { + if atomic.LoadInt32(&q.active) == 0 { + return + } + + atomic.StoreInt32(&q.active, 0) + + close(q.wait) + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() +} + +// add job to the queue +func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) { + r, err := s.SendMessage(pack(q.url, j)) + if err != nil { + return "", err + } + + return *r.MessageId, nil +} + +// return queue stats +func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) { + r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{ + QueueUrl: q.url, + AttributeNames: []*string{ + aws.String("ApproximateNumberOfMessages"), + aws.String("ApproximateNumberOfMessagesDelayed"), + aws.String("ApproximateNumberOfMessagesNotVisible"), + }, + }) + + if err != nil { + return nil, err + } + + stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")} + + for a, v := range r.Attributes { + if a == "ApproximateNumberOfMessages" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Queue = int64(v) + } + } + + if a == "ApproximateNumberOfMessagesNotVisible" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Active = int64(v) + } + } + + if a == "ApproximateNumberOfMessagesDelayed" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Delayed = int64(v) + } + } + } + + return stat, nil +} + +// throw handles service, server and pool events. +func (q *queue) report(err error) { + if err != nil { + q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) + } +} |