summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-21 16:52:41 +0300
committerValery Piashchynski <[email protected]>2021-07-21 16:52:41 +0300
commitb2da831f47284974551710d2767a7bdde0efa51d (patch)
tree7d8fee59cdb307110d2fcd872635437e0203321b /plugins/jobs/drivers/amqp
parent50cf036c81668508c8f2e9130bc5a2019cddf1b9 (diff)
Fix AMQP context, add ID, job, other fields.
Fix sqs queue re-creation. Complete redia for the beanstalk. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp')
-rw-r--r--plugins/jobs/drivers/amqp/item.go16
-rw-r--r--plugins/jobs/drivers/amqp/redial.go4
2 files changed, 17 insertions, 3 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 7c300c88..6b912dde 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -82,7 +82,21 @@ func (j *Item) Body() []byte {
// Context packs job context (job, id) into binary payload.
// Not used in the amqp, amqp.Table used instead
func (j *Item) Context() ([]byte, error) {
- return nil, nil
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Timeout int64 `json:"timeout"`
+ Pipeline string `json:"pipeline"`
+ }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
}
func (j *Item) Ack() error {
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index 4f04484e..532aadb4 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -38,7 +38,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
expb := backoff.NewExponentialBackOff()
// set the retry timeout (minutes)
expb.MaxElapsedTime = j.retryTimeout
- op := func() error {
+ operation := func() error {
j.log.Warn("rabbitmq reconnecting, caused by", "error", err)
var dialErr error
j.conn, dialErr = amqp.Dial(j.connStr)
@@ -90,7 +90,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
return nil
}
- retryErr := backoff.Retry(op, expb)
+ retryErr := backoff.Retry(operation, expb)
if retryErr != nil {
j.Unlock()
j.log.Error("backoff failed", "error", retryErr)