summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-10 22:48:31 +0300
committerValery Piashchynski <[email protected]>2021-08-10 22:48:31 +0300
commitd449d9d5aec1eec6d494064299feb1551f88ffe2 (patch)
treea905126b44bcfab29af9b5bc3eddaf5398375975
parenta8a7f4194156440ef3157d8e5d75c43ed0327bcf (diff)
Add support for the jobs-worker protocol for the beanstalk,ephemeral and
sqs drivers Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/informer/interface.go4
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go4
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go22
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go30
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/requeue.go24
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go115
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go46
-rw-r--r--plugins/jobs/drivers/ephemeral/requeue.go25
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go21
-rw-r--r--plugins/jobs/drivers/sqs/item.go65
-rw-r--r--plugins/jobs/drivers/sqs/listener.go24
-rw-r--r--plugins/jobs/drivers/sqs/requeue.go25
13 files changed, 269 insertions, 138 deletions
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index d91ddf9d..bbc1a048 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -19,3 +19,7 @@ type Availabler interface {
// Available method needed to collect all plugins which are available in the runtime.
Available()
}
+
+type JobsStat interface {
+ Stat()
+}
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index ae223f39..32ca4188 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -61,6 +61,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t
cp.RLock()
defer cp.RUnlock()
+ // TODO(rustatian): redial based on the token
id, err := cp.t.Put(body, pri, delay, ttr)
if err != nil {
// errN contains both, err and internal checkAndRedial error
@@ -82,7 +83,6 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t
//
// Typically, a client will reserve a job, perform some work, then delete
// the job with Conn.Delete.
-
func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
cp.RLock()
defer cp.RUnlock()
@@ -126,7 +126,7 @@ func (cp *ConnPool) redial() error {
cp.Lock()
// backoff here
expb := backoff.NewExponentialBackOff()
- // TODO set via config
+ // TODO(rustatian) set via config
expb.MaxElapsedTime = time.Minute
operation := func() error {
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 54c8318b..b57b22ac 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -36,7 +36,8 @@ type JobConsumer struct {
tubePriority uint32
priority int64
- stopCh chan struct{}
+ stopCh chan struct{}
+ requeueCh chan *Item
}
func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -88,9 +89,12 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
// buffered with two because jobs root plugin can call Stop at the same time as Pause
stopCh: make(chan struct{}, 2),
- reconnectCh: make(chan struct{}),
+ requeueCh: make(chan *Item, 1000),
+ reconnectCh: make(chan struct{}, 2),
}
+ jc.requeueListener()
+
return jc, nil
}
@@ -135,9 +139,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
// buffered with two because jobs root plugin can call Stop at the same time as Pause
stopCh: make(chan struct{}, 2),
+ requeueCh: make(chan *Item, 1000),
reconnectCh: make(chan struct{}, 2),
}
+ jc.requeueListener()
+
return jc, nil
}
func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
@@ -150,7 +157,16 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
- item := fromJob(jb)
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
+ const op = errors.Op("beanstalk_handle_item")
bb := new(bytes.Buffer)
bb.Grow(64)
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 7c792b46..91dbf41c 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -7,6 +7,7 @@ import (
"github.com/beanstalkd/go-beanstalk"
json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -40,12 +41,19 @@ type Options struct {
// Delay defines time duration to delay execution for. Defaults to none.
Delay int64 `json:"delay,omitempty"`
- // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ // Reserve defines for how broker should wait until treating job are failed.
+ // - <ttr> -- time to run -- is an integer number of seconds to allow a worker
+ // to run this job. This time is counted from the moment a worker reserves
+ // this job. If the worker does not delete, release, or bury the job within
+ // <ttr> seconds, the job will time out and the server will release the job.
+ // The minimum ttr is 1. If the client sends 0, the server will silently
+ // increase the ttr to 1. Maximum ttr is 2**32-1.
Timeout int64 `json:"timeout,omitempty"`
// Private ================
- id uint64
- conn *beanstalk.Conn
+ id uint64
+ conn *beanstalk.Conn
+ requeueCh chan *Item
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -103,8 +111,15 @@ func (i *Item) Nack() error {
return i.Options.conn.Delete(i.Options.id)
}
-func (i *Item) Requeue(_ int64) error {
- return nil
+func (i *Item) Requeue(delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ select {
+ case i.Options.requeueCh <- i:
+ return nil
+ default:
+ return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+ }
}
func fromJob(job *job.Job) *Item {
@@ -131,13 +146,14 @@ func (i *Item) pack(b *bytes.Buffer) error {
return nil
}
-func unpack(id uint64, data []byte, conn *beanstalk.Conn, out *Item) error {
+func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
if err != nil {
return err
}
- out.Options.conn = conn
+ out.Options.conn = j.pool.conn
out.Options.id = id
+ out.Options.requeueCh = j.requeueCh
return nil
}
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index aaf635b1..f1385e70 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -26,7 +26,7 @@ func (j *JobConsumer) listen() {
}
item := &Item{}
- err = unpack(id, body, j.pool.conn, item)
+ err = j.unpack(id, body, item)
if err != nil {
j.log.Error("beanstalk unpack item", "error", err)
continue
diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go
new file mode 100644
index 00000000..21053940
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/requeue.go
@@ -0,0 +1,24 @@
+package beanstalk
+
+import "context"
+
+// requeueListener should handle items passed to requeue
+func (j *JobConsumer) requeueListener() {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case item, ok := <-j.requeueCh:
+ if !ok {
+ j.log.Info("requeue channel closed")
+ return
+ }
+
+ err := j.handleItem(context.TODO(), item)
+ if err != nil {
+ j.log.Error("requeue handle item", "error", err)
+ continue
+ }
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 043da118..050d74b9 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -31,9 +31,11 @@ type JobConsumer struct {
pq priorityqueue.Queue
localPrefetch chan *Item
+ // time.sleep goroutines max number
goroutinesMaxNum uint64
- stopCh chan struct{}
+ requeueCh chan *Item
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -45,6 +47,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
eh: eh,
goroutinesMaxNum: 1000,
stopCh: make(chan struct{}, 1),
+ requeueCh: make(chan *Item, 1000),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -61,6 +64,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
// consume from the queue
go jb.consume()
+ jb.requeueListener()
return jb, nil
}
@@ -72,6 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
eh: eh,
goroutinesMaxNum: 1000,
stopCh: make(chan struct{}, 1),
+ requeueCh: make(chan *Item, 1000),
}
// initialize a local queue
@@ -79,6 +84,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
// consume from the queue
go jb.consume()
+ jb.requeueListener()
return jb, nil
}
@@ -87,45 +93,54 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- if b, ok := j.pipeline.Load(jb.Options.Pipeline); ok {
- if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
- }
-
- msg := fromJob(jb)
- // handle timeouts
- // theoretically, some bad user may send a millions requests with a delay and produce a billion (for example)
- // goroutines here. We should limit goroutines here.
- if msg.Options.Delay > 0 {
- // if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 {
- return errors.E(op, errors.Str("max concurrency number reached"))
- }
+ b, ok := j.pipeline.Load(jb.Options.Pipeline)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
+ }
- go func(jj *job.Job) {
- atomic.AddUint64(&j.goroutinesMaxNum, 1)
- time.Sleep(jj.Options.DelayDuration())
+ if !b.(bool) {
+ return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
+ }
- // send the item after timeout expired
- j.localPrefetch <- msg
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
- atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0))
- }(jb)
+ return nil
+}
- return nil
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("ephemeral_handle_request")
+ // handle timeouts
+ // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 {
+ return errors.E(op, errors.Str("max concurrency number reached"))
}
- // insert to the local, limited pipeline
- select {
- case j.localPrefetch <- msg:
- default:
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d", j.cfg.Prefetch))
- }
+ go func(jj *Item) {
+ atomic.AddUint64(&j.goroutinesMaxNum, 1)
+ time.Sleep(jj.Options.DelayDuration())
+
+ // send the item after timeout expired
+ j.localPrefetch <- jj
+
+ atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0))
+ }(msg)
return nil
}
- return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
+ // insert to the local, limited pipeline
+ select {
+ case j.localPrefetch <- msg:
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ }
}
func (j *JobConsumer) consume() {
@@ -133,6 +148,9 @@ func (j *JobConsumer) consume() {
for {
select {
case item := <-j.localPrefetch:
+
+ // set requeue channel
+ item.Options.requeueCh = j.requeueCh
j.pq.Insert(item)
case <-j.stopCh:
return
@@ -140,7 +158,7 @@ func (j *JobConsumer) consume() {
}
}
-func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline) error {
+func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
const op = errors.Op("ephemeral_register")
if _, ok := j.pipeline.Load(pipeline.Name()); ok {
return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
@@ -151,12 +169,14 @@ func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline)
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, pipeline string) {
+func (j *JobConsumer) Pause(_ context.Context, pipeline string) {
if q, ok := j.pipeline.Load(pipeline); ok {
if q == true {
// mark pipeline as turned off
j.pipeline.Store(pipeline, false)
}
+ // if not true - do not send the EventPipeStopped, because pipe already stopped
+ return
}
j.eh.Push(events.JobEvent{
@@ -167,12 +187,15 @@ func (j *JobConsumer) Pause(ctx context.Context, pipeline string) {
})
}
-func (j *JobConsumer) Resume(ctx context.Context, pipeline string) {
+func (j *JobConsumer) Resume(_ context.Context, pipeline string) {
if q, ok := j.pipeline.Load(pipeline); ok {
if q == false {
// mark pipeline as turned on
j.pipeline.Store(pipeline, true)
}
+
+ // if not true - do not send the EventPipeActive, because pipe already active
+ return
}
j.eh.Push(events.JobEvent{
@@ -184,7 +207,7 @@ func (j *JobConsumer) Resume(ctx context.Context, pipeline string) {
}
// Run is no-op for the ephemeral
-func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
@@ -194,7 +217,8 @@ func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *JobConsumer) Stop(ctx context.Context) error {
+ const op = errors.Op("ephemeral_plugin_stop")
var pipe string
j.pipeline.Range(func(key, _ interface{}) bool {
pipe = key.(string)
@@ -202,15 +226,18 @@ func (j *JobConsumer) Stop(context.Context) error {
return true
})
+ select {
// return from the consumer
- j.stopCh <- struct{}{}
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Pipeline: pipe,
- Start: time.Now(),
- Elapsed: 0,
- })
+ case j.stopCh <- struct{}{}:
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipe,
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+ return nil
- return nil
+ case <-ctx.Done():
+ return errors.E(op, ctx.Err())
+ }
}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 8560f10a..d140c9ed 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -8,20 +8,6 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-func fromJob(job *job.Job) *Item {
- return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- Timeout: job.Options.Timeout,
- },
- }
-}
-
type Item struct {
// Job contains name of job broker (usually PHP class).
Job string `json:"job"`
@@ -53,6 +39,9 @@ type Options struct {
// Timeout defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout int64 `json:"timeout,omitempty"`
+
+ // private
+ requeueCh chan *Item
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -111,6 +100,33 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(_ int64) error {
+func (i *Item) Requeue(delay int64) error {
+ go func() {
+ time.Sleep(time.Second * time.Duration(delay))
+ // overwrite the delay
+ i.Options.Delay = delay
+ select {
+ case i.Options.requeueCh <- i:
+ return
+ default:
+ // TODO(rustatian): logs?
+ return
+ }
+ }()
+
return nil
}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ },
+ }
+}
diff --git a/plugins/jobs/drivers/ephemeral/requeue.go b/plugins/jobs/drivers/ephemeral/requeue.go
new file mode 100644
index 00000000..afb97d54
--- /dev/null
+++ b/plugins/jobs/drivers/ephemeral/requeue.go
@@ -0,0 +1,25 @@
+package ephemeral
+
+import "context"
+
+// requeueListener should handle items passed to requeue
+func (j *JobConsumer) requeueListener() {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case item, ok := <-j.requeueCh:
+ if !ok {
+ j.log.Info("requeue channel closed")
+ return
+ }
+
+ // TODO(rustatian): what timeout to use?
+ err := j.handleItem(context.TODO(), item)
+ if err != nil {
+ j.log.Error("requeue handle item", "error", err)
+ continue
+ }
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index b81d08e5..8d93b12c 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -50,7 +50,8 @@ type JobConsumer struct {
client *sqs.Client
queueURL *string
- pauseCh chan struct{}
+ requeueCh chan *Item
+ pauseCh chan struct{}
}
func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -102,6 +103,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
pauseCh: make(chan struct{}, 1),
+ requeueCh: make(chan *Item, 1000),
}
// PARSE CONFIGURATION -------
@@ -136,6 +138,8 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
time.Sleep(time.Second * 2)
+ jb.requeueListener()
+
return jb, nil
}
@@ -201,6 +205,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
pauseCh: make(chan struct{}, 1),
+ requeueCh: make(chan *Item, 1000),
}
// PARSE CONFIGURATION -------
@@ -235,6 +240,8 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
time.Sleep(time.Second * 2)
+ jb.requeueListener()
+
return jb, nil
}
@@ -254,13 +261,19 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
}
- msg := fromJob(jb)
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
_, err := j.client.SendMessage(ctx, msg.pack(j.queueURL))
if err != nil {
- return errors.E(op, err)
+ return err
}
return nil
@@ -310,7 +323,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, p string) {
+func (j *JobConsumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index a3039d1b..ea4ac8b7 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -60,23 +60,12 @@ type Options struct {
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout int64 `json:"timeout,omitempty"`
- // Maximum number of attempts to receive and process the message
- MaxAttempts int64 `json:"max_attempts,omitempty"`
-
// Private ================
approxReceiveCount int64
queue *string
receiptHandler *string
client *sqs.Client
-}
-
-// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry() bool {
- // Attempts 1 and 0 has identical effect
- if o.MaxAttempts == 0 || o.MaxAttempts == 1 {
- return false
- }
- return o.MaxAttempts > (o.approxReceiveCount + 1)
+ requeueCh chan *Item
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -140,10 +129,6 @@ func (i *Item) Ack() error {
}
func (i *Item) Nack() error {
- if i.Options.CanRetry() {
- return nil
- }
-
_, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: i.Options.queue,
ReceiptHandle: i.Options.receiptHandler,
@@ -156,8 +141,15 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(_ int64) error {
- return nil
+func (i *Item) Requeue(delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ select {
+ case i.Options.requeueCh <- i:
+ return nil
+ default:
+ return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+ }
}
func fromJob(job *job.Job) *Item {
@@ -167,11 +159,10 @@ func fromJob(job *job.Job) *Item {
Payload: job.Payload,
Headers: job.Headers,
Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- Timeout: job.Options.Timeout,
- MaxAttempts: job.Options.Attempts,
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
},
}
}
@@ -182,16 +173,15 @@ func (i *Item) pack(queue *string) *sqs.SendMessageInput {
QueueUrl: queue,
DelaySeconds: int32(i.Options.Delay),
MessageAttributes: map[string]types.MessageAttributeValue{
- job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
- job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
- job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))},
- job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
- job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))},
+ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
+ job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
+ job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))},
+ job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
},
}
}
-func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error) {
+func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
@@ -204,11 +194,6 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error
}
}
- attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
if err != nil {
return nil, errors.E(op, err)
@@ -233,16 +218,16 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error
Job: *msg.MessageAttributes[job.RRJob].StringValue,
Payload: *msg.Body,
Options: &Options{
- Delay: int64(delay),
- Timeout: int64(to),
- Priority: int64(priority),
- MaxAttempts: int64(attempt),
+ Delay: int64(delay),
+ Timeout: int64(to),
+ Priority: int64(priority),
// private
approxReceiveCount: int64(recCount),
- client: client,
- queue: queue,
+ client: j.client,
+ queue: j.queue,
receiptHandler: msg.ReceiptHandle,
+ requeueCh: j.requeueCh,
},
}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index e2323fa3..b72ac065 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -64,7 +64,7 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
for i := 0; i < len(message.Messages); i++ {
m := message.Messages[i]
- item, err := unpack(&m, j.queueURL, j.client)
+ item, err := j.unpack(&m)
if err != nil {
_, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: j.queueURL,
@@ -78,27 +78,7 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
continue
}
- // No retry
- if item.Options.MaxAttempts == 0 {
- j.pq.Insert(item)
- continue
- }
-
- // MaxAttempts option specified
- if item.Options.CanRetry() {
- j.pq.Insert(item)
- continue
- }
-
- // If MaxAttempts is more than 0, and can't retry -> delete the message
- _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.queueURL,
- ReceiptHandle: m.ReceiptHandle,
- })
- if errD != nil {
- j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
- continue
- }
+ j.pq.Insert(item)
}
}
}
diff --git a/plugins/jobs/drivers/sqs/requeue.go b/plugins/jobs/drivers/sqs/requeue.go
new file mode 100644
index 00000000..87e885e0
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/requeue.go
@@ -0,0 +1,25 @@
+package sqs
+
+import "context"
+
+// requeueListener should handle items passed to requeue
+func (j *JobConsumer) requeueListener() {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case item, ok := <-j.requeueCh:
+ if !ok {
+ j.log.Info("requeue channel closed")
+ return
+ }
+
+ // TODO(rustatian): what context to use
+ err := j.handleItem(context.TODO(), item)
+ if err != nil {
+ j.log.Error("requeue handle item", "error", err)
+ continue
+ }
+ }
+ }
+ }()
+}