summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go108
-rw-r--r--plugins/jobs/drivers/amqp/item.go37
-rw-r--r--plugins/jobs/drivers/amqp/requeue.go34
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go18
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go27
-rw-r--r--plugins/jobs/drivers/beanstalk/requeue.go24
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go4
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go9
-rw-r--r--plugins/jobs/drivers/sqs/item.go30
-rw-r--r--plugins/jobs/drivers/sqs/requeue.go25
10 files changed, 127 insertions, 189 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index d7425858..429953e1 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -54,7 +54,6 @@ type JobConsumer struct {
listeners uint32
stopCh chan struct{}
- requeueCh chan *Item
}
// NewAMQPConsumer initializes rabbitmq pipeline
@@ -112,7 +111,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
exclusive: pipeCfg.Exclusive,
multipleAck: pipeCfg.MultipleAck,
requeueOnFail: pipeCfg.RequeueOnFail,
- requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -137,7 +135,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
// run redialer and requeue listener for the connection
jb.redialer()
- jb.requeueListener()
return jb, nil
}
@@ -184,7 +181,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
exclusive: pipeline.Bool(exclusive, false),
multipleAck: pipeline.Bool(multipleAsk, false),
requeueOnFail: pipeline.Bool(requeueOnFail, false),
- requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -213,7 +209,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// run redialer for the connection
jb.redialer()
- jb.requeueListener()
return jb, nil
}
@@ -228,9 +223,17 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
}
- // lock needed here to protect redial concurrent operation
- // we may be in the redial state here
+ err := j.handleItem(ctx, fromJob(job))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+// handleItem
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("rabbitmq_handle_item")
select {
case pch := <-j.publishChan:
// return the channel back
@@ -239,40 +242,35 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
}()
// convert
- msg := fromJob(job)
- p, err := pack(job.Ident, msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- err = j.handleItem(msg, p, pch)
+ table, err := pack(msg.ID(), msg)
if err != nil {
return errors.E(op, err)
}
- return nil
+ const op = errors.Op("amqp_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ // TODO dlx cache channel??
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ return errors.E(op, err)
+ }
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
-// handleItem
-func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) error {
- const op = errors.Op("amqp_handle_item")
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
-
- // delay cache optimization.
- // If user already declared a queue with a delay, do not redeclare and rebind the queue
- // Before -> 2.5k RPS with redeclaration
- // After -> 30k RPS
- if _, exists := j.delayCache[tmpQ]; exists {
// insert to the local, limited pipeline
- err := pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
Timestamp: time.Now().UTC(),
@@ -284,29 +282,16 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel)
return errors.E(op, err)
}
- return nil
- }
-
- _, err := pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
- if err != nil {
- return errors.E(op, err)
- }
+ j.delayCache[tmpQ] = struct{}{}
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
+ return nil
}
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
- Timestamp: time.Now().UTC(),
+ Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
Body: msg.Body(),
})
@@ -315,25 +300,10 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel)
return errors.E(op, err)
}
- j.delayCache[tmpQ] = struct{}{}
-
return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
}
-
- // insert to the local, limited pipeline
- err := pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -491,8 +461,6 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
func (j *JobConsumer) Stop(context.Context) error {
j.stopCh <- struct{}{}
- close(j.requeueCh)
-
pipe := j.pipeline.Load().(*pipeline.Pipeline)
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 908dbd15..f252acd8 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -1,6 +1,8 @@
package amqp
import (
+ "context"
+ "fmt"
"time"
json "github.com/json-iterator/go"
@@ -52,7 +54,7 @@ type Options struct {
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
multipleAsk bool
requeue bool
@@ -118,12 +120,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ errAck := i.Options.nack(false, true)
+ if errAck != nil {
+ return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck)
+ }
+
+ return err
}
+
+ // ack the job
+ err = i.Options.ack(false)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
@@ -144,8 +162,9 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
- // requeue channel
- item.Options.requeueCh = j.requeueCh
+
+ // requeue func
+ item.Options.requeueFn = j.handleItem
return i, nil
}
@@ -186,7 +205,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
multipleAsk: j.multipleAck,
requeue: j.requeueOnFail,
- requeueCh: j.requeueCh,
+ requeueFn: j.handleItem,
}}
if _, ok := d.Headers[job.RRID].(string); !ok {
diff --git a/plugins/jobs/drivers/amqp/requeue.go b/plugins/jobs/drivers/amqp/requeue.go
deleted file mode 100644
index a2b3b26c..00000000
--- a/plugins/jobs/drivers/amqp/requeue.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package amqp
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- pch := <-j.publishChan
-
- headers, err := pack(item.ID(), item)
- if err != nil {
- j.publishChan <- pch
- j.log.Error("requeue pack", "error", err)
- continue
- }
-
- err = j.handleItem(item, headers, pch)
- if err != nil {
- j.publishChan <- pch
- j.log.Error("requeue handle item", "error", err)
- continue
- }
-
- j.publishChan <- pch
- }
- }
- }()
-}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 21b05b16..f41a2c8a 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -48,6 +48,15 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
var pipeCfg Config
var globalCfg GlobalCfg
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
err := cfg.UnmarshalKey(configKey, &pipeCfg)
if err != nil {
return nil, errors.E(op, err)
@@ -94,8 +103,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
reconnectCh: make(chan struct{}, 2),
}
- jc.requeueListener()
-
return jc, nil
}
@@ -105,6 +112,11 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
// PARSE CONFIGURATION -------
var globalCfg GlobalCfg
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
err := cfg.UnmarshalKey(pluginName, &globalCfg)
if err != nil {
return nil, errors.E(op, err)
@@ -144,8 +156,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
reconnectCh: make(chan struct{}, 2),
}
- jc.requeueListener()
-
return jc, nil
}
func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index a5aa1791..47336b43 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -2,12 +2,12 @@ package beanstalk
import (
"bytes"
+ "context"
"encoding/gob"
"time"
"github.com/beanstalkd/go-beanstalk"
json "github.com/json-iterator/go"
- "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -53,7 +53,7 @@ type Options struct {
// Private ================
id uint64
conn *beanstalk.Conn
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -115,12 +115,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
}
+
+ // delete old job
+ err = i.Options.conn.Delete(i.Options.id)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
func fromJob(job *job.Job) *Item {
@@ -154,7 +165,7 @@ func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
}
out.Options.conn = j.pool.conn
out.Options.id = id
- out.Options.requeueCh = j.requeueCh
+ out.Options.requeueFn = j.handleItem
return nil
}
diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go
deleted file mode 100644
index 21053940..00000000
--- a/plugins/jobs/drivers/beanstalk/requeue.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package beanstalk
-
-import "context"
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- err := j.handleItem(context.TODO(), item)
- if err != nil {
- j.log.Error("requeue handle item", "error", err)
- continue
- }
- }
- }
- }()
-}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index c1171ae2..9fab8d24 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -118,6 +118,10 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return nil
}
+func (i *Item) Recycle() {
+ i.Options = nil
+}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 8d93b12c..5d741358 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -50,8 +50,7 @@ type JobConsumer struct {
client *sqs.Client
queueURL *string
- requeueCh chan *Item
- pauseCh chan struct{}
+ pauseCh chan struct{}
}
func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -103,7 +102,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
pauseCh: make(chan struct{}, 1),
- requeueCh: make(chan *Item, 1000),
}
// PARSE CONFIGURATION -------
@@ -138,8 +136,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
time.Sleep(time.Second * 2)
- jb.requeueListener()
-
return jb, nil
}
@@ -205,7 +201,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
pauseCh: make(chan struct{}, 1),
- requeueCh: make(chan *Item, 1000),
}
// PARSE CONFIGURATION -------
@@ -240,8 +235,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
time.Sleep(time.Second * 2)
- jb.requeueListener()
-
return jb, nil
}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index a761d6bd..f5fac0b3 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -64,7 +64,7 @@ type Options struct {
queue *string
receiptHandler *string
client *sqs.Client
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -144,12 +144,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ // requeue message
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
}
+
+ // Delete job from the queue only after successful requeue
+ _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
func fromJob(job *job.Job) *Item {
@@ -227,7 +243,7 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
client: j.client,
queue: j.queue,
receiptHandler: msg.ReceiptHandle,
- requeueCh: j.requeueCh,
+ requeueFn: j.handleItem,
},
}
diff --git a/plugins/jobs/drivers/sqs/requeue.go b/plugins/jobs/drivers/sqs/requeue.go
deleted file mode 100644
index 87e885e0..00000000
--- a/plugins/jobs/drivers/sqs/requeue.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package sqs
-
-import "context"
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- // TODO(rustatian): what context to use
- err := j.handleItem(context.TODO(), item)
- if err != nil {
- j.log.Error("requeue handle item", "error", err)
- continue
- }
- }
- }
- }()
-}