diff options
author | Valery Piashchynski <[email protected]> | 2021-07-21 16:52:41 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-21 16:52:41 +0300 |
commit | b2da831f47284974551710d2767a7bdde0efa51d (patch) | |
tree | 7d8fee59cdb307110d2fcd872635437e0203321b /plugins/jobs/drivers/amqp | |
parent | 50cf036c81668508c8f2e9130bc5a2019cddf1b9 (diff) |
Fix AMQP context, add ID, job, other fields.
Fix sqs queue re-creation.
Complete redia for the beanstalk.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp')
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 16 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/redial.go | 4 |
2 files changed, 17 insertions, 3 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 7c300c88..6b912dde 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -82,7 +82,21 @@ func (j *Item) Body() []byte { // Context packs job context (job, id) into binary payload. // Not used in the amqp, amqp.Table used instead func (j *Item) Context() ([]byte, error) { - return nil, nil + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout int64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil } func (j *Item) Ack() error { diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go index 4f04484e..532aadb4 100644 --- a/plugins/jobs/drivers/amqp/redial.go +++ b/plugins/jobs/drivers/amqp/redial.go @@ -38,7 +38,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) expb.MaxElapsedTime = j.retryTimeout - op := func() error { + operation := func() error { j.log.Warn("rabbitmq reconnecting, caused by", "error", err) var dialErr error j.conn, dialErr = amqp.Dial(j.connStr) @@ -90,7 +90,7 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit return nil } - retryErr := backoff.Retry(op, expb) + retryErr := backoff.Retry(operation, expb) if retryErr != nil { j.Unlock() j.log.Error("backoff failed", "error", retryErr) |