diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/headers.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/headers.go | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/plugins/jobs/brokers/amqp/headers.go b/plugins/jobs/brokers/amqp/headers.go new file mode 100644 index 00000000..b1f9c89d --- /dev/null +++ b/plugins/jobs/brokers/amqp/headers.go @@ -0,0 +1,68 @@ +package amqp + +import ( + "fmt" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/streadway/amqp" +) + +const ( + rrID string = "rr-id" + rrJob string = "rr-job" + rrAttempt string = "rr-attempt" + rrMaxAttempts string = "rr-max_attempts" + rrTimeout string = "rr-timeout" + rrDelay string = "rr-delay" + rrRetryDelay string = "rr-retry_delay" +) + +// pack job metadata into headers +func pack(id string, attempt uint64, j *structs.Job) amqp.Table { + return amqp.Table{ + rrID: id, + rrJob: j.Job, + rrAttempt: attempt, + rrMaxAttempts: j.Options.Attempts, + rrTimeout: j.Options.Timeout, + rrDelay: j.Options.Delay, + rrRetryDelay: j.Options.RetryDelay, + } +} + +// unpack restores jobs.Options +func unpack(d amqp.Delivery) (id string, attempt int, j *structs.Job, err error) { //nolint:deadcode,unused + j = &structs.Job{Payload: string(d.Body), Options: &structs.Options{}} + + if _, ok := d.Headers[rrID].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", rrID) + } + + if _, ok := d.Headers[rrAttempt].(uint64); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt) + } + + if _, ok := d.Headers[rrJob].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob) + } + + j.Job = d.Headers[rrJob].(string) + + if _, ok := d.Headers[rrMaxAttempts].(uint64); ok { + j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64) + } + + if _, ok := d.Headers[rrTimeout].(uint64); ok { + j.Options.Timeout = d.Headers[rrTimeout].(uint64) + } + + if _, ok := d.Headers[rrDelay].(uint64); ok { + j.Options.Delay = d.Headers[rrDelay].(uint64) + } + + if _, ok := d.Headers[rrRetryDelay].(uint64); ok { + j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64) + } + + return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil +} |