summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-10 19:54:03 +0300
committerValery Piashchynski <[email protected]>2021-08-10 19:54:03 +0300
commita8a7f4194156440ef3157d8e5d75c43ed0327bcf (patch)
tree9bc4240fb3c6f02682420689490f56d681d4b545 /plugins/jobs/drivers
parentd379c28a1e9babead0266bc4fa10d6c5e7aa14cb (diff)
Add jobs protocol support for the AMQP driver
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go118
-rw-r--r--plugins/jobs/drivers/amqp/item.go50
-rw-r--r--plugins/jobs/drivers/amqp/redial.go3
-rw-r--r--plugins/jobs/drivers/amqp/requeue.go34
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go2
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go2
-rw-r--r--plugins/jobs/drivers/sqs/item.go2
7 files changed, 139 insertions, 72 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 3ca5c742..36a16bcd 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -54,6 +54,7 @@ type JobConsumer struct {
listeners uint32
stopCh chan struct{}
+ requeueCh chan *Item
}
// NewAMQPConsumer initializes rabbitmq pipeline
@@ -111,6 +112,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
exclusive: pipeCfg.Exclusive,
multipleAck: pipeCfg.MultipleAck,
requeueOnFail: pipeCfg.RequeueOnFail,
+ requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -133,8 +135,9 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
jb.publishChan <- pch
- // run redialer for the connection
+ // run redialer and requeue listener for the connection
jb.redialer()
+ jb.requeueListener()
return jb, nil
}
@@ -181,6 +184,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
exclusive: pipeline.Bool(exclusive, true),
multipleAck: pipeline.Bool(multipleAsk, false),
requeueOnFail: pipeline.Bool(requeueOnFail, false),
+ requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -209,6 +213,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// run redialer for the connection
jb.redialer()
+ jb.requeueListener()
return jb, nil
}
@@ -240,52 +245,35 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return errors.E(op, err)
}
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
-
- // delay cache optimization.
- // If user already declared a queue with a delay, do not redeclare and rebind the queue
- // Before -> 2.5k RPS with redeclaration
- // After -> 30k RPS
- if _, exists := j.delayCache[tmpQ]; exists {
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: p,
- ContentType: contentType,
- Timestamp: time.Now().UTC(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- }
-
- _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
+ err = j.handleItem(msg, p, pch)
+ if err != nil {
+ return errors.E(op, err)
+ }
- if err != nil {
- return errors.E(op, err)
- }
+ return nil
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
- }
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+// handleItem
+func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) error {
+ const op = errors.Op("amqp_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+
+ // delay cache optimization.
+ // If user already declared a queue with a delay, do not redeclare and rebind the queue
+ // Before -> 2.5k RPS with redeclaration
+ // After -> 30k RPS
+ if _, exists := j.delayCache[tmpQ]; exists {
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: p,
+ err := pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
ContentType: contentType,
Timestamp: time.Now().UTC(),
DeliveryMode: amqp.Persistent,
@@ -296,28 +284,56 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return errors.E(op, err)
}
- j.delayCache[tmpQ] = struct{}{}
-
return nil
}
+ _, err := pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: p,
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
ContentType: contentType,
- Timestamp: time.Now(),
+ Timestamp: time.Now().UTC(),
DeliveryMode: amqp.Persistent,
Body: msg.Body(),
})
+
if err != nil {
return errors.E(op, err)
}
+ j.delayCache[tmpQ] = struct{}{}
+
return nil
+ }
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
+ // insert to the local, limited pipeline
+ err := pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
}
+
+ return nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -475,6 +491,8 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
func (j *JobConsumer) Stop(context.Context) error {
j.stopCh <- struct{}{}
+ close(j.requeueCh)
+
pipe := j.pipeline.Load().(*pipeline.Pipeline)
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index a7e2c4e5..1a7ce00e 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -25,15 +25,6 @@ type Item struct {
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
-
- // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
- AckFunc func(multiply bool) error
-
- // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
- // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
- // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
- // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
- NackFunc func(multiply bool, requeue bool) error
}
// Options carry information about how to handle given job.
@@ -52,6 +43,17 @@ type Options struct {
Timeout int64 `json:"timeout,omitempty"`
// private
+ // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
+ ack func(multiply bool) error
+
+ // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
+ // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
+ // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
+ // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
+ nack func(multiply bool, requeue bool) error
+
+ requeueCh chan *Item
+
multipleAsk bool
requeue bool
}
@@ -104,16 +106,23 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
- return i.AckFunc(i.Options.multipleAsk)
+ return i.Options.ack(i.Options.multipleAsk)
}
func (i *Item) Nack() error {
- return i.NackFunc(false, i.Options.requeue)
+ return i.Options.nack(false, i.Options.requeue)
}
// Requeue with the provided delay, handled by the Nack
-func (i *Item) Requeue(_ uint32) error {
- return nil
+func (i *Item) Requeue(delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ select {
+ case i.Options.requeueCh <- i:
+ return nil
+ default:
+ return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+ }
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
@@ -123,16 +132,20 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
if err != nil {
return nil, errors.E(op, err)
}
- return &Item{
+
+ i := &Item{
Job: item.Job,
Ident: item.Ident,
Payload: item.Payload,
Headers: item.Headers,
Options: item.Options,
- // internal
- AckFunc: d.Ack,
- NackFunc: d.Nack,
- }, nil
+ }
+
+ item.Options.ack = d.Ack
+ item.Options.nack = d.Nack
+ // requeue channel
+ item.Options.requeueCh = j.requeueCh
+ return i, nil
}
func fromJob(job *job.Job) *Item {
@@ -172,6 +185,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
multipleAsk: j.multipleAck,
requeue: j.requeueOnFail,
+ requeueCh: j.requeueCh,
}}
if _, ok := d.Headers[job.RRID].(string); !ok {
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index ef2a130a..8dc18b8f 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -24,7 +24,7 @@ func (j *JobConsumer) redialer() { //nolint:gocognit
j.Lock()
- // trash the broken publish channel
+ // trash the broken publishing channel
<-j.publishChan
t := time.Now()
@@ -85,6 +85,7 @@ func (j *JobConsumer) redialer() { //nolint:gocognit
return errors.E(op, err)
}
+ // put the fresh publishing channel
j.publishChan <- pch
// restart listener
j.listener(deliv)
diff --git a/plugins/jobs/drivers/amqp/requeue.go b/plugins/jobs/drivers/amqp/requeue.go
new file mode 100644
index 00000000..a2b3b26c
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/requeue.go
@@ -0,0 +1,34 @@
+package amqp
+
+// requeueListener should handle items passed to requeue
+func (j *JobConsumer) requeueListener() {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case item, ok := <-j.requeueCh:
+ if !ok {
+ j.log.Info("requeue channel closed")
+ return
+ }
+
+ pch := <-j.publishChan
+
+ headers, err := pack(item.ID(), item)
+ if err != nil {
+ j.publishChan <- pch
+ j.log.Error("requeue pack", "error", err)
+ continue
+ }
+
+ err = j.handleItem(item, headers, pch)
+ if err != nil {
+ j.publishChan <- pch
+ j.log.Error("requeue handle item", "error", err)
+ continue
+ }
+
+ j.publishChan <- pch
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index eb532e08..7c792b46 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -103,7 +103,7 @@ func (i *Item) Nack() error {
return i.Options.conn.Delete(i.Options.id)
}
-func (i *Item) Requeue(_ uint32) error {
+func (i *Item) Requeue(_ int64) error {
return nil
}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index ebf16524..8560f10a 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -111,6 +111,6 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(_ uint32) error {
+func (i *Item) Requeue(_ int64) error {
return nil
}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index 50d8ac18..a3039d1b 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -156,7 +156,7 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(_ uint32) error {
+func (i *Item) Requeue(_ int64) error {
return nil
}