summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
committerValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
commit2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch)
treed796a11941fab4be668843a3fcbd83ea0859db39 /plugins
parente855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff)
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go108
-rw-r--r--plugins/jobs/drivers/amqp/item.go37
-rw-r--r--plugins/jobs/drivers/amqp/requeue.go34
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go18
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go27
-rw-r--r--plugins/jobs/drivers/beanstalk/requeue.go24
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go4
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go9
-rw-r--r--plugins/jobs/drivers/sqs/item.go30
-rw-r--r--plugins/jobs/drivers/sqs/requeue.go25
-rw-r--r--plugins/jobs/plugin.go28
-rw-r--r--plugins/jobs/protocol.go11
-rw-r--r--plugins/jobs/response_protocol.md13
13 files changed, 155 insertions, 213 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index d7425858..429953e1 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -54,7 +54,6 @@ type JobConsumer struct {
listeners uint32
stopCh chan struct{}
- requeueCh chan *Item
}
// NewAMQPConsumer initializes rabbitmq pipeline
@@ -112,7 +111,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
exclusive: pipeCfg.Exclusive,
multipleAck: pipeCfg.MultipleAck,
requeueOnFail: pipeCfg.RequeueOnFail,
- requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -137,7 +135,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
// run redialer and requeue listener for the connection
jb.redialer()
- jb.requeueListener()
return jb, nil
}
@@ -184,7 +181,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
exclusive: pipeline.Bool(exclusive, false),
multipleAck: pipeline.Bool(multipleAsk, false),
requeueOnFail: pipeline.Bool(requeueOnFail, false),
- requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -213,7 +209,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// run redialer for the connection
jb.redialer()
- jb.requeueListener()
return jb, nil
}
@@ -228,9 +223,17 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
}
- // lock needed here to protect redial concurrent operation
- // we may be in the redial state here
+ err := j.handleItem(ctx, fromJob(job))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+// handleItem
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("rabbitmq_handle_item")
select {
case pch := <-j.publishChan:
// return the channel back
@@ -239,40 +242,35 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
}()
// convert
- msg := fromJob(job)
- p, err := pack(job.Ident, msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- err = j.handleItem(msg, p, pch)
+ table, err := pack(msg.ID(), msg)
if err != nil {
return errors.E(op, err)
}
- return nil
+ const op = errors.Op("amqp_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ // TODO dlx cache channel??
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ return errors.E(op, err)
+ }
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
-// handleItem
-func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) error {
- const op = errors.Op("amqp_handle_item")
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
-
- // delay cache optimization.
- // If user already declared a queue with a delay, do not redeclare and rebind the queue
- // Before -> 2.5k RPS with redeclaration
- // After -> 30k RPS
- if _, exists := j.delayCache[tmpQ]; exists {
// insert to the local, limited pipeline
- err := pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
Timestamp: time.Now().UTC(),
@@ -284,29 +282,16 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel)
return errors.E(op, err)
}
- return nil
- }
-
- _, err := pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
- if err != nil {
- return errors.E(op, err)
- }
+ j.delayCache[tmpQ] = struct{}{}
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
+ return nil
}
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
- Timestamp: time.Now().UTC(),
+ Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
Body: msg.Body(),
})
@@ -315,25 +300,10 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel)
return errors.E(op, err)
}
- j.delayCache[tmpQ] = struct{}{}
-
return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
}
-
- // insert to the local, limited pipeline
- err := pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -491,8 +461,6 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
func (j *JobConsumer) Stop(context.Context) error {
j.stopCh <- struct{}{}
- close(j.requeueCh)
-
pipe := j.pipeline.Load().(*pipeline.Pipeline)
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 908dbd15..f252acd8 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -1,6 +1,8 @@
package amqp
import (
+ "context"
+ "fmt"
"time"
json "github.com/json-iterator/go"
@@ -52,7 +54,7 @@ type Options struct {
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
multipleAsk bool
requeue bool
@@ -118,12 +120,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ errAck := i.Options.nack(false, true)
+ if errAck != nil {
+ return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck)
+ }
+
+ return err
}
+
+ // ack the job
+ err = i.Options.ack(false)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
@@ -144,8 +162,9 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
- // requeue channel
- item.Options.requeueCh = j.requeueCh
+
+ // requeue func
+ item.Options.requeueFn = j.handleItem
return i, nil
}
@@ -186,7 +205,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
multipleAsk: j.multipleAck,
requeue: j.requeueOnFail,
- requeueCh: j.requeueCh,
+ requeueFn: j.handleItem,
}}
if _, ok := d.Headers[job.RRID].(string); !ok {
diff --git a/plugins/jobs/drivers/amqp/requeue.go b/plugins/jobs/drivers/amqp/requeue.go
deleted file mode 100644
index a2b3b26c..00000000
--- a/plugins/jobs/drivers/amqp/requeue.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package amqp
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- pch := <-j.publishChan
-
- headers, err := pack(item.ID(), item)
- if err != nil {
- j.publishChan <- pch
- j.log.Error("requeue pack", "error", err)
- continue
- }
-
- err = j.handleItem(item, headers, pch)
- if err != nil {
- j.publishChan <- pch
- j.log.Error("requeue handle item", "error", err)
- continue
- }
-
- j.publishChan <- pch
- }
- }
- }()
-}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 21b05b16..f41a2c8a 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -48,6 +48,15 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
var pipeCfg Config
var globalCfg GlobalCfg
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
err := cfg.UnmarshalKey(configKey, &pipeCfg)
if err != nil {
return nil, errors.E(op, err)
@@ -94,8 +103,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
reconnectCh: make(chan struct{}, 2),
}
- jc.requeueListener()
-
return jc, nil
}
@@ -105,6 +112,11 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
// PARSE CONFIGURATION -------
var globalCfg GlobalCfg
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
err := cfg.UnmarshalKey(pluginName, &globalCfg)
if err != nil {
return nil, errors.E(op, err)
@@ -144,8 +156,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
reconnectCh: make(chan struct{}, 2),
}
- jc.requeueListener()
-
return jc, nil
}
func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index a5aa1791..47336b43 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -2,12 +2,12 @@ package beanstalk
import (
"bytes"
+ "context"
"encoding/gob"
"time"
"github.com/beanstalkd/go-beanstalk"
json "github.com/json-iterator/go"
- "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -53,7 +53,7 @@ type Options struct {
// Private ================
id uint64
conn *beanstalk.Conn
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -115,12 +115,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
}
+
+ // delete old job
+ err = i.Options.conn.Delete(i.Options.id)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
func fromJob(job *job.Job) *Item {
@@ -154,7 +165,7 @@ func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
}
out.Options.conn = j.pool.conn
out.Options.id = id
- out.Options.requeueCh = j.requeueCh
+ out.Options.requeueFn = j.handleItem
return nil
}
diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go
deleted file mode 100644
index 21053940..00000000
--- a/plugins/jobs/drivers/beanstalk/requeue.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package beanstalk
-
-import "context"
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- err := j.handleItem(context.TODO(), item)
- if err != nil {
- j.log.Error("requeue handle item", "error", err)
- continue
- }
- }
- }
- }()
-}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index c1171ae2..9fab8d24 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -118,6 +118,10 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return nil
}
+func (i *Item) Recycle() {
+ i.Options = nil
+}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 8d93b12c..5d741358 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -50,8 +50,7 @@ type JobConsumer struct {
client *sqs.Client
queueURL *string
- requeueCh chan *Item
- pauseCh chan struct{}
+ pauseCh chan struct{}
}
func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -103,7 +102,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
pauseCh: make(chan struct{}, 1),
- requeueCh: make(chan *Item, 1000),
}
// PARSE CONFIGURATION -------
@@ -138,8 +136,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
time.Sleep(time.Second * 2)
- jb.requeueListener()
-
return jb, nil
}
@@ -205,7 +201,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
pauseCh: make(chan struct{}, 1),
- requeueCh: make(chan *Item, 1000),
}
// PARSE CONFIGURATION -------
@@ -240,8 +235,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
time.Sleep(time.Second * 2)
- jb.requeueListener()
-
return jb, nil
}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index a761d6bd..f5fac0b3 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -64,7 +64,7 @@ type Options struct {
queue *string
receiptHandler *string
client *sqs.Client
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -144,12 +144,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ // requeue message
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
}
+
+ // Delete job from the queue only after successful requeue
+ _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
func fromJob(job *job.Job) *Item {
@@ -227,7 +243,7 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
client: j.client,
queue: j.queue,
receiptHandler: msg.ReceiptHandle,
- requeueCh: j.requeueCh,
+ requeueFn: j.handleItem,
},
}
diff --git a/plugins/jobs/drivers/sqs/requeue.go b/plugins/jobs/drivers/sqs/requeue.go
deleted file mode 100644
index 87e885e0..00000000
--- a/plugins/jobs/drivers/sqs/requeue.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package sqs
-
-import "context"
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- // TODO(rustatian): what context to use
- err := j.handleItem(context.TODO(), item)
- if err != nil {
- j.log.Error("requeue handle item", "error", err)
- continue
- }
- }
- }
- }()
-}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 8ea18cfd..87559034 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -104,19 +104,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return nil
}
-func (p *Plugin) getPayload(body, context []byte) *payload.Payload {
- pld := p.pldPool.Get().(*payload.Payload)
- pld.Body = body
- pld.Context = context
- return pld
-}
-
-func (p *Plugin) putPayload(pld *payload.Payload) {
- pld.Body = nil
- pld.Context = nil
- p.pldPool.Put(pld)
-}
-
func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
@@ -261,6 +248,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
+ // free the resources
+ jb.Recycle()
// return payload
p.putPayload(exec)
}
@@ -565,3 +554,16 @@ func (p *Plugin) collectJobsEvents(event interface{}) {
}
}
}
+
+func (p *Plugin) getPayload(body, context []byte) *payload.Payload {
+ pld := p.pldPool.Get().(*payload.Payload)
+ pld.Body = body
+ pld.Context = context
+ return pld
+}
+
+func (p *Plugin) putPayload(pld *payload.Payload) {
+ pld.Body = nil
+ pld.Context = nil
+ p.pldPool.Put(pld)
+}
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
index 691369d0..9d769fdf 100644
--- a/plugins/jobs/protocol.go
+++ b/plugins/jobs/protocol.go
@@ -10,8 +10,8 @@ import (
type Type uint32
const (
- Error Type = iota
- NoError
+ NoError Type = iota
+ Error
)
// internal worker protocol (jobs mode)
@@ -19,7 +19,7 @@ type protocol struct {
// message type, see Type
T Type `json:"type"`
// Payload
- Data []byte `json:"data"`
+ Data json.RawMessage `json:"data"`
}
type errorResp struct {
@@ -55,7 +55,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
return errors.E(op, err)
}
- log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
+ log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
if er.Requeue {
err = jb.Requeue(er.Headers, er.Delay)
@@ -64,6 +64,9 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
}
return nil
}
+
+ return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg))
+
default:
err = jb.Ack()
if err != nil {
diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md
index 77c78cb8..c89877e3 100644
--- a/plugins/jobs/response_protocol.md
+++ b/plugins/jobs/response_protocol.md
@@ -15,7 +15,8 @@ Types are:
- `NO_ERROR`: contains only `type` and empty `data`.
- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the
job,
- `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap with string key and array of strings as a value.
+ `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
+ with string key and array of strings as a value.
For example:
@@ -41,12 +42,10 @@ For example:
"headers": [
{
"test": [
- {
- "ttt": "11",
- "ggg": "22"
- }
- ],
- "test2": "2"
+ "1",
+ "2",
+ "3"
+ ]
}
],
"delay_seconds": 10