summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers/amqp/item.go')
-rw-r--r--plugins/jobs/brokers/amqp/item.go99
1 files changed, 47 insertions, 52 deletions
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index e5e580e0..06e2bf56 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -12,13 +12,13 @@ import (
)
const (
- rrID string = "rr-id"
- rrJob string = "rr-job"
- // rrAttempt string = "rr-attempt"
- // rrMaxAttempts string = "rr-max_attempts"
- rrTimeout string = "rr-timeout"
- rrDelay string = "rr-delay"
- rrRetryDelay string = "rr-retry_delay"
+ rrID string = "rr_id"
+ rrJob string = "rr_job"
+ rrHeaders string = "rr_headers"
+ rrPipeline string = "rr_pipeline"
+ rrTimeout string = "rr_timeout"
+ rrDelay string = "rr_delay"
+ rrRetryDelay string = "rr_retry_delay"
)
func FromDelivery(d amqp.Delivery) (*Item, error) {
@@ -44,12 +44,12 @@ func FromJob(job *structs.Job) *Item {
Ident: job.Ident,
Payload: job.Payload,
Options: &Options{
- Priority: job.Options.Priority,
+ Priority: uint32(job.Options.Priority),
Pipeline: job.Options.Pipeline,
- Delay: int64(job.Options.Delay),
- Attempts: int64(job.Options.Attempts),
- RetryDelay: int64(job.Options.RetryDelay),
- Timeout: int64(job.Options.Timeout),
+ Delay: int32(job.Options.Delay),
+ Attempts: int32(job.Options.Attempts),
+ RetryDelay: int32(job.Options.RetryDelay),
+ Timeout: int32(job.Options.Timeout),
},
}
}
@@ -84,27 +84,27 @@ type Item struct {
type Options struct {
// Priority is job priority, default - 10
// pointer to distinguish 0 as a priority and nil as priority not set
- Priority uint64 `json:"priority"`
+ Priority uint32 `json:"priority"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
+ Delay int32 `json:"delay,omitempty"`
// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
// Minimum valuable value is 2.
- Attempts int64 `json:"maxAttempts,omitempty"`
+ Attempts int32 `json:"maxAttempts,omitempty"`
// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay int64 `json:"retryDelay,omitempty"`
+ RetryDelay int32 `json:"retryDelay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout int64 `json:"timeout,omitempty"`
+ Timeout int32 `json:"timeout,omitempty"`
}
// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt int64) bool {
+func (o *Options) CanRetry(attempt int32) bool {
// Attempts 1 and 0 has identical effect
return o.Attempts > (attempt + 1)
}
@@ -133,7 +133,7 @@ func (j *Item) ID() string {
}
func (j *Item) Priority() uint64 {
- return j.Options.Priority
+ return uint64(j.Options.Priority)
}
// Body packs job payload into binary payload.
@@ -142,21 +142,9 @@ func (j *Item) Body() []byte {
}
// Context packs job context (job, id) into binary payload.
+// Not used in the amqp, amqp.Table used instead
func (j *Item) Context() ([]byte, error) {
- ctx, err := json.Marshal(
- struct {
- ID string `json:"id"`
- Job string `json:"job"`
- Headers map[string][]string `json:"headers"`
- Timeout int64 `json:"timeout"`
- Pipeline string `json:"pipeline"`
- }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
- )
- if err != nil {
- return nil, err
- }
-
- return ctx, nil
+ return nil, nil
}
func (j *Item) Ack() error {
@@ -168,16 +156,20 @@ func (j *Item) Nack() error {
}
// pack job metadata into headers
-func pack(id string, j *Item) amqp.Table {
+func pack(id string, j *Item) (amqp.Table, error) {
+ headers, err := json.Marshal(j.Headers)
+ if err != nil {
+ return nil, err
+ }
return amqp.Table{
- rrID: id,
- rrJob: j.Job,
- // rrAttempt: attempt,
- // rrMaxAttempts: j.Options.Attempts,
+ rrID: id,
+ rrJob: j.Job,
+ rrPipeline: j.Options.Pipeline,
+ rrHeaders: headers,
rrTimeout: j.Options.Timeout,
rrDelay: j.Options.Delay,
rrRetryDelay: j.Options.RetryDelay,
- }
+ }, nil
}
// unpack restores jobs.Options
@@ -188,30 +180,33 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) {
return "", nil, fmt.Errorf("missing header `%s`", rrID)
}
- // if _, ok := d.Headers[rrAttempt].(uint64); !ok {
- // return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
- // }
-
if _, ok := d.Headers[rrJob].(string); !ok {
return "", nil, fmt.Errorf("missing header `%s`", rrJob)
}
j.Job = d.Headers[rrJob].(string)
- // if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
- // j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
- // }
+ if _, ok := d.Headers[rrPipeline].(string); ok {
+ j.Options.Pipeline = d.Headers[rrPipeline].(string)
+ }
+
+ if h, ok := d.Headers[rrHeaders].([]byte); ok {
+ err := json.Unmarshal(h, &j.Headers)
+ if err != nil {
+ return "", nil, err
+ }
+ }
- if _, ok := d.Headers[rrTimeout].(int64); ok {
- j.Options.Timeout = d.Headers[rrTimeout].(int64)
+ if _, ok := d.Headers[rrTimeout].(int32); ok {
+ j.Options.Timeout = d.Headers[rrTimeout].(int32)
}
- if _, ok := d.Headers[rrDelay].(int64); ok {
- j.Options.Delay = d.Headers[rrDelay].(int64)
+ if _, ok := d.Headers[rrDelay].(int32); ok {
+ j.Options.Delay = d.Headers[rrDelay].(int32)
}
- if _, ok := d.Headers[rrRetryDelay].(int64); ok {
- j.Options.RetryDelay = d.Headers[rrRetryDelay].(int64)
+ if _, ok := d.Headers[rrRetryDelay].(int32); ok {
+ j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32)
}
return d.Headers[rrID].(string), j, nil