diff options
author | Valery Piashchynski <[email protected]> | 2021-08-11 13:42:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-11 13:42:33 +0300 |
commit | de37ed3ae8d08a50d9ffe088c1d58d9dffdf7c9b (patch) | |
tree | 712a47dde5941dd53c9f12ae41e62384df57b4b4 /plugins/jobs | |
parent | 2924f4b4daa3c53408a036583dcc39f6de805e2b (diff) |
Add headers support to the jobs protocol
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 3 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/item.go | 3 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 3 | ||||
-rw-r--r-- | plugins/jobs/job/general.go | 43 | ||||
-rw-r--r-- | plugins/jobs/job/job_options.go | 14 | ||||
-rw-r--r-- | plugins/jobs/protocol.go | 9 | ||||
-rw-r--r-- | plugins/jobs/response_protocol.md | 9 |
8 files changed, 28 insertions, 60 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 1a7ce00e..908dbd15 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -114,9 +114,10 @@ func (i *Item) Nack() error { } // Requeue with the provided delay, handled by the Nack -func (i *Item) Requeue(delay int64) error { +func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay + i.Headers = headers select { case i.Options.requeueCh <- i: return nil diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 91dbf41c..50c54b12 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -111,9 +111,10 @@ func (i *Item) Nack() error { return i.Options.conn.Delete(i.Options.id) } -func (i *Item) Requeue(delay int64) error { +func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay + i.Headers = headers select { case i.Options.requeueCh <- i: return nil @@ -122,6 +123,7 @@ func (i *Item) Requeue(delay int64) error { } } + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index d140c9ed..c1171ae2 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -100,11 +100,12 @@ func (i *Item) Nack() error { return nil } -func (i *Item) Requeue(delay int64) error { +func (i *Item) Requeue(headers map[string][]string, delay int64) error { go func() { time.Sleep(time.Second * time.Duration(delay)) // overwrite the delay i.Options.Delay = delay + i.Headers = headers select { case i.Options.requeueCh <- i: return diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index cd2f6104..a761d6bd 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -140,9 +140,10 @@ func (i *Item) Nack() error { return nil } -func (i *Item) Requeue(delay int64) error { +func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay + i.Headers = headers select { case i.Options.requeueCh <- i: return nil diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/general.go index d2a27373..0a75f2e6 100644 --- a/plugins/jobs/job/general.go +++ b/plugins/jobs/job/general.go @@ -2,14 +2,13 @@ package job // constant keys to pack/unpack messages from different drivers const ( - RRID string = "rr_id" - RRJob string = "rr_job" - RRHeaders string = "rr_headers" - RRPipeline string = "rr_pipeline" - RRTimeout string = "rr_timeout" - RRDelay string = "rr_delay" - RRPriority string = "rr_priority" - RRMaxAttempts string = "rr_max_attempts" + RRID string = "rr_id" + RRJob string = "rr_job" + RRHeaders string = "rr_headers" + RRPipeline string = "rr_pipeline" + RRTimeout string = "rr_timeout" + RRDelay string = "rr_delay" + RRPriority string = "rr_priority" ) // Job carries information about single job. @@ -29,31 +28,3 @@ type Job struct { // Options contains set of PipelineOptions specific to job execution. Can be empty. Options *Options `json:"options,omitempty"` } - -func (j Job) ID() string { - panic("implement me") -} - -func (j Job) Priority() int64 { - panic("implement me") -} - -func (j Job) Body() []byte { - panic("implement me") -} - -func (j Job) Context() ([]byte, error) { - panic("implement me") -} - -func (j Job) Ack() error { - panic("implement me") -} - -func (j Job) Nack() error { - panic("implement me") -} - -func (j Job) Requeue(delay uint32) error { - panic("implement me") -} diff --git a/plugins/jobs/job/job_options.go b/plugins/jobs/job/job_options.go index af971d15..133ae1a8 100644 --- a/plugins/jobs/job/job_options.go +++ b/plugins/jobs/job/job_options.go @@ -14,10 +14,6 @@ type Options struct { // Delay defines time duration to delay execution for. Defaults to none. Delay int64 `json:"delay,omitempty"` - // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). - // Minimum valuable value is 2. - Attempts int64 `json:"maxAttempts,omitempty"` - // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. RetryDelay int64 `json:"retryDelay,omitempty"` @@ -31,10 +27,6 @@ func (o *Options) Merge(from *Options) { o.Pipeline = from.Pipeline } - if o.Attempts == 0 { - o.Attempts = from.Attempts - } - if o.Timeout == 0 { o.Timeout = from.Timeout } @@ -48,12 +40,6 @@ func (o *Options) Merge(from *Options) { } } -// CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt int64) bool { - // Attempts 1 and 0 has identical effect - return o.Attempts > (attempt + 1) -} - // RetryDuration returns retry delay duration in a form of time.Duration. func (o *Options) RetryDuration() time.Duration { return time.Second * time.Duration(o.RetryDelay) diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go index e27f2868..691369d0 100644 --- a/plugins/jobs/protocol.go +++ b/plugins/jobs/protocol.go @@ -23,9 +23,10 @@ type protocol struct { } type errorResp struct { - Msg string `json:"message"` - Requeue bool `json:"requeue"` - Delay int64 `json:"delay_seconds"` + Msg string `json:"message"` + Requeue bool `json:"requeue"` + Delay int64 `json:"delay_seconds"` + Headers map[string][]string `json:"headers"` } func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { @@ -57,7 +58,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) if er.Requeue { - err = jb.Requeue(er.Delay) + err = jb.Requeue(er.Headers, er.Delay) if err != nil { return errors.E(op, err) } diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md index 577317d4..77c78cb8 100644 --- a/plugins/jobs/response_protocol.md +++ b/plugins/jobs/response_protocol.md @@ -15,7 +15,7 @@ Types are: - `NO_ERROR`: contains only `type` and empty `data`. - `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job, - `dalay_seconds`: to delay a queue for a provided amount of seconds. + `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap with string key and array of strings as a value. For example: @@ -40,7 +40,12 @@ For example: "requeue": true, "headers": [ { - "test": "1", + "test": [ + { + "ttt": "11", + "ggg": "22" + } + ], "test2": "2" } ], |