summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/item.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-10 11:17:29 +0300
committerValery Piashchynski <[email protected]>2021-07-10 11:17:29 +0300
commit96d437f96785ada6aa5eb6d6ec9505b977ab3e74 (patch)
treee413299670acfe94d983029f9b9137571d925b10 /plugins/jobs/brokers/amqp/item.go
parentfa57fa609d14e4ebf4cbffc154804402906eecaa (diff)
Update Consumer interface, List method returns []string of the
pipelines. Update packing and unpacking Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/item.go')
-rw-r--r--plugins/jobs/brokers/amqp/item.go101
1 files changed, 53 insertions, 48 deletions
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index 190e72e8..e5e580e0 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -5,25 +5,27 @@ import (
"time"
json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/utils"
"github.com/streadway/amqp"
)
const (
- rrID string = "rr-id"
- rrJob string = "rr-job"
- rrAttempt string = "rr-attempt"
- rrMaxAttempts string = "rr-max_attempts"
- rrTimeout string = "rr-timeout"
- rrDelay string = "rr-delay"
- rrRetryDelay string = "rr-retry_delay"
+ rrID string = "rr-id"
+ rrJob string = "rr-job"
+ // rrAttempt string = "rr-attempt"
+ // rrMaxAttempts string = "rr-max_attempts"
+ rrTimeout string = "rr-timeout"
+ rrDelay string = "rr-delay"
+ rrRetryDelay string = "rr-retry_delay"
)
-func FromDelivery(d amqp.Delivery) *Item {
- id, _, item, err := unpack(d)
+func FromDelivery(d amqp.Delivery) (*Item, error) {
+ const op = errors.Op("from_delivery_convert")
+ id, item, err := unpack(d)
if err != nil {
- panic(err)
+ return nil, errors.E(op, err)
}
return &Item{
Job: item.Job,
@@ -33,7 +35,7 @@ func FromDelivery(d amqp.Delivery) *Item {
Options: item.Options,
AckFunc: d.Ack,
NackFunc: d.Nack,
- }
+ }, nil
}
func FromJob(job *structs.Job) *Item {
@@ -41,14 +43,17 @@ func FromJob(job *structs.Job) *Item {
Job: job.Job,
Ident: job.Ident,
Payload: job.Payload,
- Options: conv(*job.Options),
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: int64(job.Options.Delay),
+ Attempts: int64(job.Options.Attempts),
+ RetryDelay: int64(job.Options.RetryDelay),
+ Timeout: int64(job.Options.Timeout),
+ },
}
}
-func conv(jo structs.Options) Options {
- return Options(jo)
-}
-
type Item struct {
// Job contains pluginName of job broker (usually PHP class).
Job string `json:"job"`
@@ -63,7 +68,7 @@ type Item struct {
Headers map[string][]string
// Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options Options `json:"options,omitempty"`
+ Options *Options `json:"options,omitempty"`
// Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
AckFunc func(multiply bool) error
@@ -85,21 +90,21 @@ type Options struct {
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay uint64 `json:"delay,omitempty"`
+ Delay int64 `json:"delay,omitempty"`
// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
// Minimum valuable value is 2.
- Attempts uint64 `json:"maxAttempts,omitempty"`
+ Attempts int64 `json:"maxAttempts,omitempty"`
// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay uint64 `json:"retryDelay,omitempty"`
+ RetryDelay int64 `json:"retryDelay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout uint64 `json:"timeout,omitempty"`
+ Timeout int64 `json:"timeout,omitempty"`
}
// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt uint64) bool {
+func (o *Options) CanRetry(attempt int64) bool {
// Attempts 1 and 0 has identical effect
return o.Attempts > (attempt + 1)
}
@@ -143,7 +148,7 @@ func (j *Item) Context() ([]byte, error) {
ID string `json:"id"`
Job string `json:"job"`
Headers map[string][]string `json:"headers"`
- Timeout uint64 `json:"timeout"`
+ Timeout int64 `json:"timeout"`
Pipeline string `json:"pipeline"`
}{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
)
@@ -163,51 +168,51 @@ func (j *Item) Nack() error {
}
// pack job metadata into headers
-func pack(id string, attempt uint64, j *Item) amqp.Table {
+func pack(id string, j *Item) amqp.Table {
return amqp.Table{
- rrID: id,
- rrJob: j.Job,
- rrAttempt: attempt,
- rrMaxAttempts: j.Options.Attempts,
- rrTimeout: j.Options.Timeout,
- rrDelay: j.Options.Delay,
- rrRetryDelay: j.Options.RetryDelay,
+ rrID: id,
+ rrJob: j.Job,
+ // rrAttempt: attempt,
+ // rrMaxAttempts: j.Options.Attempts,
+ rrTimeout: j.Options.Timeout,
+ rrDelay: j.Options.Delay,
+ rrRetryDelay: j.Options.RetryDelay,
}
}
// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (id string, attempt int, j *Item, err error) {
- j = &Item{Payload: string(d.Body), Options: Options{}}
+func unpack(d amqp.Delivery) (id string, j *Item, err error) {
+ j = &Item{Payload: string(d.Body), Options: &Options{}}
if _, ok := d.Headers[rrID].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", rrID)
+ return "", nil, fmt.Errorf("missing header `%s`", rrID)
}
- if _, ok := d.Headers[rrAttempt].(uint64); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
- }
+ // if _, ok := d.Headers[rrAttempt].(uint64); !ok {
+ // return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
+ // }
if _, ok := d.Headers[rrJob].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob)
+ return "", nil, fmt.Errorf("missing header `%s`", rrJob)
}
j.Job = d.Headers[rrJob].(string)
- if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
- j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
- }
+ // if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
+ // j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
+ // }
- if _, ok := d.Headers[rrTimeout].(uint64); ok {
- j.Options.Timeout = d.Headers[rrTimeout].(uint64)
+ if _, ok := d.Headers[rrTimeout].(int64); ok {
+ j.Options.Timeout = d.Headers[rrTimeout].(int64)
}
- if _, ok := d.Headers[rrDelay].(uint64); ok {
- j.Options.Delay = d.Headers[rrDelay].(uint64)
+ if _, ok := d.Headers[rrDelay].(int64); ok {
+ j.Options.Delay = d.Headers[rrDelay].(int64)
}
- if _, ok := d.Headers[rrRetryDelay].(uint64); ok {
- j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64)
+ if _, ok := d.Headers[rrRetryDelay].(int64); ok {
+ j.Options.RetryDelay = d.Headers[rrRetryDelay].(int64)
}
- return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil
+ return d.Headers[rrID].(string), j, nil
}