summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs/item.go')
-rw-r--r--plugins/jobs/drivers/sqs/item.go28
1 files changed, 1 insertions, 27 deletions
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index eac06731..df72b2e5 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -24,7 +24,6 @@ const (
var itemAttributes = []string{
job.RRJob,
job.RRDelay,
- job.RRTimeout,
job.RRPriority,
job.RRHeaders,
}
@@ -58,9 +57,6 @@ type Options struct {
// Delay defines time duration to delay execution for. Defaults to none.
Delay int64 `json:"delay,omitempty"`
- // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout int64 `json:"timeout,omitempty"`
-
// Private ================
approxReceiveCount int64
queue *string
@@ -74,15 +70,6 @@ func (o *Options) DelayDuration() time.Duration {
return time.Second * time.Duration(o.Delay)
}
-// TimeoutDuration returns timeout duration in a form of time.Duration.
-func (o *Options) TimeoutDuration() time.Duration {
- if o.Timeout == 0 {
- return 30 * time.Minute
- }
-
- return time.Second * time.Duration(o.Timeout)
-}
-
func (i *Item) ID() string {
return i.Ident
}
@@ -104,9 +91,8 @@ func (i *Item) Context() ([]byte, error) {
ID string `json:"id"`
Job string `json:"job"`
Headers map[string][]string `json:"headers"`
- Timeout int64 `json:"timeout"`
Pipeline string `json:"pipeline"`
- }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline},
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
)
if err != nil {
@@ -172,10 +158,6 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return nil
}
-func (i *Item) Recycle() {
- i.Options = nil
-}
-
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
@@ -186,7 +168,6 @@ func fromJob(job *job.Job) *Item {
Priority: job.Options.Priority,
Pipeline: job.Options.Pipeline,
Delay: job.Options.Delay,
- Timeout: job.Options.Timeout,
},
}
}
@@ -205,7 +186,6 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
MessageAttributes: map[string]types.MessageAttributeValue{
job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
- job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))},
job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil},
job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
},
@@ -236,11 +216,6 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
return nil, errors.E(op, err)
}
- to, err := strconv.Atoi(*msg.MessageAttributes[job.RRTimeout].StringValue)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue)
if err != nil {
return nil, errors.E(op, err)
@@ -257,7 +232,6 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
Headers: h,
Options: &Options{
Delay: int64(delay),
- Timeout: int64(to),
Priority: int64(priority),
// private