summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/amqp/job.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp/job.go')
-rw-r--r--plugins/jobs/oooold/broker/amqp/job.go56
1 files changed, 56 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go
new file mode 100644
index 00000000..bd559715
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/job.go
@@ -0,0 +1,56 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/streadway/amqp"
+)
+
+// pack job metadata into headers
+func pack(id string, attempt int, j *jobs.Job) amqp.Table {
+ return amqp.Table{
+ "rr-id": id,
+ "rr-job": j.Job,
+ "rr-attempt": int64(attempt),
+ "rr-maxAttempts": int64(j.Options.Attempts),
+ "rr-timeout": int64(j.Options.Timeout),
+ "rr-delay": int64(j.Options.Delay),
+ "rr-retryDelay": int64(j.Options.RetryDelay),
+ }
+}
+
+// unpack restores jobs.Options
+func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) {
+ j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}}
+
+ if _, ok := d.Headers["rr-id"].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id")
+ }
+
+ if _, ok := d.Headers["rr-attempt"].(int64); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt")
+ }
+
+ if _, ok := d.Headers["rr-job"].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job")
+ }
+ j.Job = d.Headers["rr-job"].(string)
+
+ if _, ok := d.Headers["rr-maxAttempts"].(int64); ok {
+ j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64))
+ }
+
+ if _, ok := d.Headers["rr-timeout"].(int64); ok {
+ j.Options.Timeout = int(d.Headers["rr-timeout"].(int64))
+ }
+
+ if _, ok := d.Headers["rr-delay"].(int64); ok {
+ j.Options.Delay = int(d.Headers["rr-delay"].(int64))
+ }
+
+ if _, ok := d.Headers["rr-retryDelay"].(int64); ok {
+ j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64))
+ }
+
+ return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil
+}