diff options
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp/job.go')
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/job.go | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go new file mode 100644 index 00000000..bd559715 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/job.go @@ -0,0 +1,56 @@ +package amqp + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/streadway/amqp" +) + +// pack job metadata into headers +func pack(id string, attempt int, j *jobs.Job) amqp.Table { + return amqp.Table{ + "rr-id": id, + "rr-job": j.Job, + "rr-attempt": int64(attempt), + "rr-maxAttempts": int64(j.Options.Attempts), + "rr-timeout": int64(j.Options.Timeout), + "rr-delay": int64(j.Options.Delay), + "rr-retryDelay": int64(j.Options.RetryDelay), + } +} + +// unpack restores jobs.Options +func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) { + j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}} + + if _, ok := d.Headers["rr-id"].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id") + } + + if _, ok := d.Headers["rr-attempt"].(int64); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt") + } + + if _, ok := d.Headers["rr-job"].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job") + } + j.Job = d.Headers["rr-job"].(string) + + if _, ok := d.Headers["rr-maxAttempts"].(int64); ok { + j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64)) + } + + if _, ok := d.Headers["rr-timeout"].(int64); ok { + j.Options.Timeout = int(d.Headers["rr-timeout"].(int64)) + } + + if _, ok := d.Headers["rr-delay"].(int64); ok { + j.Options.Delay = int(d.Headers["rr-delay"].(int64)) + } + + if _, ok := d.Headers["rr-retryDelay"].(int64); ok { + j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64)) + } + + return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil +} |