diff options
Diffstat (limited to 'plugins/jobs/drivers/sqs/item.go')
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 28 |
1 files changed, 1 insertions, 27 deletions
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index eac06731..df72b2e5 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -24,7 +24,6 @@ const ( var itemAttributes = []string{ job.RRJob, job.RRDelay, - job.RRTimeout, job.RRPriority, job.RRHeaders, } @@ -58,9 +57,6 @@ type Options struct { // Delay defines time duration to delay execution for. Defaults to none. Delay int64 `json:"delay,omitempty"` - // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. - Timeout int64 `json:"timeout,omitempty"` - // Private ================ approxReceiveCount int64 queue *string @@ -74,15 +70,6 @@ func (o *Options) DelayDuration() time.Duration { return time.Second * time.Duration(o.Delay) } -// TimeoutDuration returns timeout duration in a form of time.Duration. -func (o *Options) TimeoutDuration() time.Duration { - if o.Timeout == 0 { - return 30 * time.Minute - } - - return time.Second * time.Duration(o.Timeout) -} - func (i *Item) ID() string { return i.Ident } @@ -104,9 +91,8 @@ func (i *Item) Context() ([]byte, error) { ID string `json:"id"` Job string `json:"job"` Headers map[string][]string `json:"headers"` - Timeout int64 `json:"timeout"` Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline}, + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, ) if err != nil { @@ -172,10 +158,6 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return nil } -func (i *Item) Recycle() { - i.Options = nil -} - func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, @@ -186,7 +168,6 @@ func fromJob(job *job.Job) *Item { Priority: job.Options.Priority, Pipeline: job.Options.Pipeline, Delay: job.Options.Delay, - Timeout: job.Options.Timeout, }, } } @@ -205,7 +186,6 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { MessageAttributes: map[string]types.MessageAttributeValue{ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, - job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))}, job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil}, job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, }, @@ -236,11 +216,6 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { return nil, errors.E(op, err) } - to, err := strconv.Atoi(*msg.MessageAttributes[job.RRTimeout].StringValue) - if err != nil { - return nil, errors.E(op, err) - } - priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue) if err != nil { return nil, errors.E(op, err) @@ -257,7 +232,6 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { Headers: h, Options: &Options{ Delay: int64(delay), - Timeout: int64(to), Priority: int64(priority), // private |