summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers/amqp/item.go')
-rw-r--r--plugins/jobs/brokers/amqp/item.go86
1 files changed, 85 insertions, 1 deletions
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index 4751df58..190e72e8 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -1,20 +1,54 @@
package amqp
import (
+ "fmt"
"time"
json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/utils"
"github.com/streadway/amqp"
)
-func From(d amqp.Delivery) *Item {
+const (
+ rrID string = "rr-id"
+ rrJob string = "rr-job"
+ rrAttempt string = "rr-attempt"
+ rrMaxAttempts string = "rr-max_attempts"
+ rrTimeout string = "rr-timeout"
+ rrDelay string = "rr-delay"
+ rrRetryDelay string = "rr-retry_delay"
+)
+
+func FromDelivery(d amqp.Delivery) *Item {
+ id, _, item, err := unpack(d)
+ if err != nil {
+ panic(err)
+ }
return &Item{
+ Job: item.Job,
+ Ident: id,
+ Payload: item.Payload,
+ Headers: item.Headers,
+ Options: item.Options,
AckFunc: d.Ack,
NackFunc: d.Nack,
}
}
+func FromJob(job *structs.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: conv(*job.Options),
+ }
+}
+
+func conv(jo structs.Options) Options {
+ return Options(jo)
+}
+
type Item struct {
// Job contains pluginName of job broker (usually PHP class).
Job string `json:"job"`
@@ -127,3 +161,53 @@ func (j *Item) Ack() error {
func (j *Item) Nack() error {
return j.NackFunc(false, false)
}
+
+// pack job metadata into headers
+func pack(id string, attempt uint64, j *Item) amqp.Table {
+ return amqp.Table{
+ rrID: id,
+ rrJob: j.Job,
+ rrAttempt: attempt,
+ rrMaxAttempts: j.Options.Attempts,
+ rrTimeout: j.Options.Timeout,
+ rrDelay: j.Options.Delay,
+ rrRetryDelay: j.Options.RetryDelay,
+ }
+}
+
+// unpack restores jobs.Options
+func unpack(d amqp.Delivery) (id string, attempt int, j *Item, err error) {
+ j = &Item{Payload: string(d.Body), Options: Options{}}
+
+ if _, ok := d.Headers[rrID].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrID)
+ }
+
+ if _, ok := d.Headers[rrAttempt].(uint64); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
+ }
+
+ if _, ok := d.Headers[rrJob].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob)
+ }
+
+ j.Job = d.Headers[rrJob].(string)
+
+ if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
+ j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
+ }
+
+ if _, ok := d.Headers[rrTimeout].(uint64); ok {
+ j.Options.Timeout = d.Headers[rrTimeout].(uint64)
+ }
+
+ if _, ok := d.Headers[rrDelay].(uint64); ok {
+ j.Options.Delay = d.Headers[rrDelay].(uint64)
+ }
+
+ if _, ok := d.Headers[rrRetryDelay].(uint64); ok {
+ j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64)
+ }
+
+ return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil
+}