diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/item.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 101 |
1 files changed, 53 insertions, 48 deletions
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index 190e72e8..e5e580e0 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -5,25 +5,27 @@ import ( "time" json "github.com/json-iterator/go" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/utils" "github.com/streadway/amqp" ) const ( - rrID string = "rr-id" - rrJob string = "rr-job" - rrAttempt string = "rr-attempt" - rrMaxAttempts string = "rr-max_attempts" - rrTimeout string = "rr-timeout" - rrDelay string = "rr-delay" - rrRetryDelay string = "rr-retry_delay" + rrID string = "rr-id" + rrJob string = "rr-job" + // rrAttempt string = "rr-attempt" + // rrMaxAttempts string = "rr-max_attempts" + rrTimeout string = "rr-timeout" + rrDelay string = "rr-delay" + rrRetryDelay string = "rr-retry_delay" ) -func FromDelivery(d amqp.Delivery) *Item { - id, _, item, err := unpack(d) +func FromDelivery(d amqp.Delivery) (*Item, error) { + const op = errors.Op("from_delivery_convert") + id, item, err := unpack(d) if err != nil { - panic(err) + return nil, errors.E(op, err) } return &Item{ Job: item.Job, @@ -33,7 +35,7 @@ func FromDelivery(d amqp.Delivery) *Item { Options: item.Options, AckFunc: d.Ack, NackFunc: d.Nack, - } + }, nil } func FromJob(job *structs.Job) *Item { @@ -41,14 +43,17 @@ func FromJob(job *structs.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, - Options: conv(*job.Options), + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: int64(job.Options.Delay), + Attempts: int64(job.Options.Attempts), + RetryDelay: int64(job.Options.RetryDelay), + Timeout: int64(job.Options.Timeout), + }, } } -func conv(jo structs.Options) Options { - return Options(jo) -} - type Item struct { // Job contains pluginName of job broker (usually PHP class). Job string `json:"job"` @@ -63,7 +68,7 @@ type Item struct { Headers map[string][]string // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options Options `json:"options,omitempty"` + Options *Options `json:"options,omitempty"` // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery AckFunc func(multiply bool) error @@ -85,21 +90,21 @@ type Options struct { Pipeline string `json:"pipeline,omitempty"` // Delay defines time duration to delay execution for. Defaults to none. - Delay uint64 `json:"delay,omitempty"` + Delay int64 `json:"delay,omitempty"` // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). // Minimum valuable value is 2. - Attempts uint64 `json:"maxAttempts,omitempty"` + Attempts int64 `json:"maxAttempts,omitempty"` // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay uint64 `json:"retryDelay,omitempty"` + RetryDelay int64 `json:"retryDelay,omitempty"` // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. - Timeout uint64 `json:"timeout,omitempty"` + Timeout int64 `json:"timeout,omitempty"` } // CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt uint64) bool { +func (o *Options) CanRetry(attempt int64) bool { // Attempts 1 and 0 has identical effect return o.Attempts > (attempt + 1) } @@ -143,7 +148,7 @@ func (j *Item) Context() ([]byte, error) { ID string `json:"id"` Job string `json:"job"` Headers map[string][]string `json:"headers"` - Timeout uint64 `json:"timeout"` + Timeout int64 `json:"timeout"` Pipeline string `json:"pipeline"` }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, ) @@ -163,51 +168,51 @@ func (j *Item) Nack() error { } // pack job metadata into headers -func pack(id string, attempt uint64, j *Item) amqp.Table { +func pack(id string, j *Item) amqp.Table { return amqp.Table{ - rrID: id, - rrJob: j.Job, - rrAttempt: attempt, - rrMaxAttempts: j.Options.Attempts, - rrTimeout: j.Options.Timeout, - rrDelay: j.Options.Delay, - rrRetryDelay: j.Options.RetryDelay, + rrID: id, + rrJob: j.Job, + // rrAttempt: attempt, + // rrMaxAttempts: j.Options.Attempts, + rrTimeout: j.Options.Timeout, + rrDelay: j.Options.Delay, + rrRetryDelay: j.Options.RetryDelay, } } // unpack restores jobs.Options -func unpack(d amqp.Delivery) (id string, attempt int, j *Item, err error) { - j = &Item{Payload: string(d.Body), Options: Options{}} +func unpack(d amqp.Delivery) (id string, j *Item, err error) { + j = &Item{Payload: string(d.Body), Options: &Options{}} if _, ok := d.Headers[rrID].(string); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", rrID) + return "", nil, fmt.Errorf("missing header `%s`", rrID) } - if _, ok := d.Headers[rrAttempt].(uint64); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt) - } + // if _, ok := d.Headers[rrAttempt].(uint64); !ok { + // return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt) + // } if _, ok := d.Headers[rrJob].(string); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob) + return "", nil, fmt.Errorf("missing header `%s`", rrJob) } j.Job = d.Headers[rrJob].(string) - if _, ok := d.Headers[rrMaxAttempts].(uint64); ok { - j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64) - } + // if _, ok := d.Headers[rrMaxAttempts].(uint64); ok { + // j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64) + // } - if _, ok := d.Headers[rrTimeout].(uint64); ok { - j.Options.Timeout = d.Headers[rrTimeout].(uint64) + if _, ok := d.Headers[rrTimeout].(int64); ok { + j.Options.Timeout = d.Headers[rrTimeout].(int64) } - if _, ok := d.Headers[rrDelay].(uint64); ok { - j.Options.Delay = d.Headers[rrDelay].(uint64) + if _, ok := d.Headers[rrDelay].(int64); ok { + j.Options.Delay = d.Headers[rrDelay].(int64) } - if _, ok := d.Headers[rrRetryDelay].(uint64); ok { - j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64) + if _, ok := d.Headers[rrRetryDelay].(int64); ok { + j.Options.RetryDelay = d.Headers[rrRetryDelay].(int64) } - return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil + return d.Headers[rrID].(string), j, nil } |