diff options
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 22 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 30 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/requeue.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 115 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/item.go | 46 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/requeue.go | 25 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 21 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 65 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/requeue.go | 25 |
12 files changed, 265 insertions, 138 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index ae223f39..32ca4188 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -61,6 +61,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t cp.RLock() defer cp.RUnlock() + // TODO(rustatian): redial based on the token id, err := cp.t.Put(body, pri, delay, ttr) if err != nil { // errN contains both, err and internal checkAndRedial error @@ -82,7 +83,6 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t // // Typically, a client will reserve a job, perform some work, then delete // the job with Conn.Delete. - func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { cp.RLock() defer cp.RUnlock() @@ -126,7 +126,7 @@ func (cp *ConnPool) redial() error { cp.Lock() // backoff here expb := backoff.NewExponentialBackOff() - // TODO set via config + // TODO(rustatian) set via config expb.MaxElapsedTime = time.Minute operation := func() error { diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 54c8318b..b57b22ac 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -36,7 +36,8 @@ type JobConsumer struct { tubePriority uint32 priority int64 - stopCh chan struct{} + stopCh chan struct{} + requeueCh chan *Item } func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -88,9 +89,12 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config // buffered with two because jobs root plugin can call Stop at the same time as Pause stopCh: make(chan struct{}, 2), - reconnectCh: make(chan struct{}), + requeueCh: make(chan *Item, 1000), + reconnectCh: make(chan struct{}, 2), } + jc.requeueListener() + return jc, nil } @@ -135,9 +139,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu // buffered with two because jobs root plugin can call Stop at the same time as Pause stopCh: make(chan struct{}, 2), + requeueCh: make(chan *Item, 1000), reconnectCh: make(chan struct{}, 2), } + jc.requeueListener() + return jc, nil } func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { @@ -150,7 +157,16 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) } - item := fromJob(jb) + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { + const op = errors.Op("beanstalk_handle_item") bb := new(bytes.Buffer) bb.Grow(64) diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 7c792b46..91dbf41c 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -7,6 +7,7 @@ import ( "github.com/beanstalkd/go-beanstalk" json "github.com/json-iterator/go" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" ) @@ -40,12 +41,19 @@ type Options struct { // Delay defines time duration to delay execution for. Defaults to none. Delay int64 `json:"delay,omitempty"` - // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + // Reserve defines for how broker should wait until treating job are failed. + // - <ttr> -- time to run -- is an integer number of seconds to allow a worker + // to run this job. This time is counted from the moment a worker reserves + // this job. If the worker does not delete, release, or bury the job within + // <ttr> seconds, the job will time out and the server will release the job. + // The minimum ttr is 1. If the client sends 0, the server will silently + // increase the ttr to 1. Maximum ttr is 2**32-1. Timeout int64 `json:"timeout,omitempty"` // Private ================ - id uint64 - conn *beanstalk.Conn + id uint64 + conn *beanstalk.Conn + requeueCh chan *Item } // DelayDuration returns delay duration in a form of time.Duration. @@ -103,8 +111,15 @@ func (i *Item) Nack() error { return i.Options.conn.Delete(i.Options.id) } -func (i *Item) Requeue(_ int64) error { - return nil +func (i *Item) Requeue(delay int64) error { + // overwrite the delay + i.Options.Delay = delay + select { + case i.Options.requeueCh <- i: + return nil + default: + return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + } } func fromJob(job *job.Job) *Item { @@ -131,13 +146,14 @@ func (i *Item) pack(b *bytes.Buffer) error { return nil } -func unpack(id uint64, data []byte, conn *beanstalk.Conn, out *Item) error { +func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error { err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) if err != nil { return err } - out.Options.conn = conn + out.Options.conn = j.pool.conn out.Options.id = id + out.Options.requeueCh = j.requeueCh return nil } diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index aaf635b1..f1385e70 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -26,7 +26,7 @@ func (j *JobConsumer) listen() { } item := &Item{} - err = unpack(id, body, j.pool.conn, item) + err = j.unpack(id, body, item) if err != nil { j.log.Error("beanstalk unpack item", "error", err) continue diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go new file mode 100644 index 00000000..21053940 --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/requeue.go @@ -0,0 +1,24 @@ +package beanstalk + +import "context" + +// requeueListener should handle items passed to requeue +func (j *JobConsumer) requeueListener() { + go func() { + for { //nolint:gosimple + select { + case item, ok := <-j.requeueCh: + if !ok { + j.log.Info("requeue channel closed") + return + } + + err := j.handleItem(context.TODO(), item) + if err != nil { + j.log.Error("requeue handle item", "error", err) + continue + } + } + } + }() +} diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 043da118..050d74b9 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -31,9 +31,11 @@ type JobConsumer struct { pq priorityqueue.Queue localPrefetch chan *Item + // time.sleep goroutines max number goroutinesMaxNum uint64 - stopCh chan struct{} + requeueCh chan *Item + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -45,6 +47,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -61,6 +64,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh // consume from the queue go jb.consume() + jb.requeueListener() return jb, nil } @@ -72,6 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } // initialize a local queue @@ -79,6 +84,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand // consume from the queue go jb.consume() + jb.requeueListener() return jb, nil } @@ -87,45 +93,54 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - if b, ok := j.pipeline.Load(jb.Options.Pipeline); ok { - if !b.(bool) { - return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) - } - - msg := fromJob(jb) - // handle timeouts - // theoretically, some bad user may send a millions requests with a delay and produce a billion (for example) - // goroutines here. We should limit goroutines here. - if msg.Options.Delay > 0 { - // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 { - return errors.E(op, errors.Str("max concurrency number reached")) - } + b, ok := j.pipeline.Load(jb.Options.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) + } - go func(jj *job.Job) { - atomic.AddUint64(&j.goroutinesMaxNum, 1) - time.Sleep(jj.Options.DelayDuration()) + if !b.(bool) { + return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) + } - // send the item after timeout expired - j.localPrefetch <- msg + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } - atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0)) - }(jb) + return nil +} - return nil +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("ephemeral_handle_request") + // handle timeouts + // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 { + return errors.E(op, errors.Str("max concurrency number reached")) } - // insert to the local, limited pipeline - select { - case j.localPrefetch <- msg: - default: - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d", j.cfg.Prefetch)) - } + go func(jj *Item) { + atomic.AddUint64(&j.goroutinesMaxNum, 1) + time.Sleep(jj.Options.DelayDuration()) + + // send the item after timeout expired + j.localPrefetch <- jj + + atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0)) + }(msg) return nil } - return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) + // insert to the local, limited pipeline + select { + case j.localPrefetch <- msg: + return nil + case <-ctx.Done(): + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) + } } func (j *JobConsumer) consume() { @@ -133,6 +148,9 @@ func (j *JobConsumer) consume() { for { select { case item := <-j.localPrefetch: + + // set requeue channel + item.Options.requeueCh = j.requeueCh j.pq.Insert(item) case <-j.stopCh: return @@ -140,7 +158,7 @@ func (j *JobConsumer) consume() { } } -func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline) error { +func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") if _, ok := j.pipeline.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) @@ -151,12 +169,14 @@ func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline) return nil } -func (j *JobConsumer) Pause(ctx context.Context, pipeline string) { +func (j *JobConsumer) Pause(_ context.Context, pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == true { // mark pipeline as turned off j.pipeline.Store(pipeline, false) } + // if not true - do not send the EventPipeStopped, because pipe already stopped + return } j.eh.Push(events.JobEvent{ @@ -167,12 +187,15 @@ func (j *JobConsumer) Pause(ctx context.Context, pipeline string) { }) } -func (j *JobConsumer) Resume(ctx context.Context, pipeline string) { +func (j *JobConsumer) Resume(_ context.Context, pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == false { // mark pipeline as turned on j.pipeline.Store(pipeline, true) } + + // if not true - do not send the EventPipeActive, because pipe already active + return } j.eh.Push(events.JobEvent{ @@ -184,7 +207,7 @@ func (j *JobConsumer) Resume(ctx context.Context, pipeline string) { } // Run is no-op for the ephemeral -func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error { +func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), @@ -194,7 +217,8 @@ func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { +func (j *JobConsumer) Stop(ctx context.Context) error { + const op = errors.Op("ephemeral_plugin_stop") var pipe string j.pipeline.Range(func(key, _ interface{}) bool { pipe = key.(string) @@ -202,15 +226,18 @@ func (j *JobConsumer) Stop(context.Context) error { return true }) + select { // return from the consumer - j.stopCh <- struct{}{} - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe, - Start: time.Now(), - Elapsed: 0, - }) + case j.stopCh <- struct{}{}: + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe, + Start: time.Now(), + Elapsed: 0, + }) + return nil - return nil + case <-ctx.Done(): + return errors.E(op, ctx.Err()) + } } diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index 8560f10a..d140c9ed 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -8,20 +8,6 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - Timeout: job.Options.Timeout, - }, - } -} - type Item struct { // Job contains name of job broker (usually PHP class). Job string `json:"job"` @@ -53,6 +39,9 @@ type Options struct { // Timeout defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` + + // private + requeueCh chan *Item } // DelayDuration returns delay duration in a form of time.Duration. @@ -111,6 +100,33 @@ func (i *Item) Nack() error { return nil } -func (i *Item) Requeue(_ int64) error { +func (i *Item) Requeue(delay int64) error { + go func() { + time.Sleep(time.Second * time.Duration(delay)) + // overwrite the delay + i.Options.Delay = delay + select { + case i.Options.requeueCh <- i: + return + default: + // TODO(rustatian): logs? + return + } + }() + return nil } + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + }, + } +} diff --git a/plugins/jobs/drivers/ephemeral/requeue.go b/plugins/jobs/drivers/ephemeral/requeue.go new file mode 100644 index 00000000..afb97d54 --- /dev/null +++ b/plugins/jobs/drivers/ephemeral/requeue.go @@ -0,0 +1,25 @@ +package ephemeral + +import "context" + +// requeueListener should handle items passed to requeue +func (j *JobConsumer) requeueListener() { + go func() { + for { //nolint:gosimple + select { + case item, ok := <-j.requeueCh: + if !ok { + j.log.Info("requeue channel closed") + return + } + + // TODO(rustatian): what timeout to use? + err := j.handleItem(context.TODO(), item) + if err != nil { + j.log.Error("requeue handle item", "error", err) + continue + } + } + } + }() +} diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index b81d08e5..8d93b12c 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -50,7 +50,8 @@ type JobConsumer struct { client *sqs.Client queueURL *string - pauseCh chan struct{} + requeueCh chan *Item + pauseCh chan struct{} } func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -102,6 +103,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -136,6 +138,8 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) + jb.requeueListener() + return jb, nil } @@ -201,6 +205,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -235,6 +240,8 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) + jb.requeueListener() + return jb, nil } @@ -254,13 +261,19 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) } - msg := fromJob(jb) + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + return nil +} +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. _, err := j.client.SendMessage(ctx, msg.pack(j.queueURL)) if err != nil { - return errors.E(op, err) + return err } return nil @@ -310,7 +323,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index a3039d1b..ea4ac8b7 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -60,23 +60,12 @@ type Options struct { // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` - // Maximum number of attempts to receive and process the message - MaxAttempts int64 `json:"max_attempts,omitempty"` - // Private ================ approxReceiveCount int64 queue *string receiptHandler *string client *sqs.Client -} - -// CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry() bool { - // Attempts 1 and 0 has identical effect - if o.MaxAttempts == 0 || o.MaxAttempts == 1 { - return false - } - return o.MaxAttempts > (o.approxReceiveCount + 1) + requeueCh chan *Item } // DelayDuration returns delay duration in a form of time.Duration. @@ -140,10 +129,6 @@ func (i *Item) Ack() error { } func (i *Item) Nack() error { - if i.Options.CanRetry() { - return nil - } - _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ QueueUrl: i.Options.queue, ReceiptHandle: i.Options.receiptHandler, @@ -156,8 +141,15 @@ func (i *Item) Nack() error { return nil } -func (i *Item) Requeue(_ int64) error { - return nil +func (i *Item) Requeue(delay int64) error { + // overwrite the delay + i.Options.Delay = delay + select { + case i.Options.requeueCh <- i: + return nil + default: + return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + } } func fromJob(job *job.Job) *Item { @@ -167,11 +159,10 @@ func fromJob(job *job.Job) *Item { Payload: job.Payload, Headers: job.Headers, Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - Timeout: job.Options.Timeout, - MaxAttempts: job.Options.Attempts, + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, }, } } @@ -182,16 +173,15 @@ func (i *Item) pack(queue *string) *sqs.SendMessageInput { QueueUrl: queue, DelaySeconds: int32(i.Options.Delay), MessageAttributes: map[string]types.MessageAttributeValue{ - job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, - job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, - job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))}, - job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, - job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))}, + job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, + job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, + job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))}, + job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, }, } } -func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error) { +func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { @@ -204,11 +194,6 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error } } - attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue) - if err != nil { - return nil, errors.E(op, err) - } - delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue) if err != nil { return nil, errors.E(op, err) @@ -233,16 +218,16 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error Job: *msg.MessageAttributes[job.RRJob].StringValue, Payload: *msg.Body, Options: &Options{ - Delay: int64(delay), - Timeout: int64(to), - Priority: int64(priority), - MaxAttempts: int64(attempt), + Delay: int64(delay), + Timeout: int64(to), + Priority: int64(priority), // private approxReceiveCount: int64(recCount), - client: client, - queue: queue, + client: j.client, + queue: j.queue, receiptHandler: msg.ReceiptHandle, + requeueCh: j.requeueCh, }, } diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index e2323fa3..b72ac065 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -64,7 +64,7 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit for i := 0; i < len(message.Messages); i++ { m := message.Messages[i] - item, err := unpack(&m, j.queueURL, j.client) + item, err := j.unpack(&m) if err != nil { _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ QueueUrl: j.queueURL, @@ -78,27 +78,7 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit continue } - // No retry - if item.Options.MaxAttempts == 0 { - j.pq.Insert(item) - continue - } - - // MaxAttempts option specified - if item.Options.CanRetry() { - j.pq.Insert(item) - continue - } - - // If MaxAttempts is more than 0, and can't retry -> delete the message - _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.queueURL, - ReceiptHandle: m.ReceiptHandle, - }) - if errD != nil { - j.log.Error("message unpack, failed to delete the message from the queue", "error", err) - continue - } + j.pq.Insert(item) } } } diff --git a/plugins/jobs/drivers/sqs/requeue.go b/plugins/jobs/drivers/sqs/requeue.go new file mode 100644 index 00000000..87e885e0 --- /dev/null +++ b/plugins/jobs/drivers/sqs/requeue.go @@ -0,0 +1,25 @@ +package sqs + +import "context" + +// requeueListener should handle items passed to requeue +func (j *JobConsumer) requeueListener() { + go func() { + for { //nolint:gosimple + select { + case item, ok := <-j.requeueCh: + if !ok { + j.log.Info("requeue channel closed") + return + } + + // TODO(rustatian): what context to use + err := j.handleItem(context.TODO(), item) + if err != nil { + j.log.Error("requeue handle item", "error", err) + continue + } + } + } + }() +} |