summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 13:42:33 +0300
committerValery Piashchynski <[email protected]>2021-08-11 13:42:33 +0300
commitde37ed3ae8d08a50d9ffe088c1d58d9dffdf7c9b (patch)
tree712a47dde5941dd53c9f12ae41e62384df57b4b4 /plugins/jobs
parent2924f4b4daa3c53408a036583dcc39f6de805e2b (diff)
Add headers support to the jobs protocol
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/drivers/amqp/item.go3
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go4
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go3
-rw-r--r--plugins/jobs/drivers/sqs/item.go3
-rw-r--r--plugins/jobs/job/general.go43
-rw-r--r--plugins/jobs/job/job_options.go14
-rw-r--r--plugins/jobs/protocol.go9
-rw-r--r--plugins/jobs/response_protocol.md9
8 files changed, 28 insertions, 60 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 1a7ce00e..908dbd15 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -114,9 +114,10 @@ func (i *Item) Nack() error {
}
// Requeue with the provided delay, handled by the Nack
-func (i *Item) Requeue(delay int64) error {
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
+ i.Headers = headers
select {
case i.Options.requeueCh <- i:
return nil
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 91dbf41c..50c54b12 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -111,9 +111,10 @@ func (i *Item) Nack() error {
return i.Options.conn.Delete(i.Options.id)
}
-func (i *Item) Requeue(delay int64) error {
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
+ i.Headers = headers
select {
case i.Options.requeueCh <- i:
return nil
@@ -122,6 +123,7 @@ func (i *Item) Requeue(delay int64) error {
}
}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index d140c9ed..c1171ae2 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -100,11 +100,12 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(delay int64) error {
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
go func() {
time.Sleep(time.Second * time.Duration(delay))
// overwrite the delay
i.Options.Delay = delay
+ i.Headers = headers
select {
case i.Options.requeueCh <- i:
return
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index cd2f6104..a761d6bd 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -140,9 +140,10 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(delay int64) error {
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
+ i.Headers = headers
select {
case i.Options.requeueCh <- i:
return nil
diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/general.go
index d2a27373..0a75f2e6 100644
--- a/plugins/jobs/job/general.go
+++ b/plugins/jobs/job/general.go
@@ -2,14 +2,13 @@ package job
// constant keys to pack/unpack messages from different drivers
const (
- RRID string = "rr_id"
- RRJob string = "rr_job"
- RRHeaders string = "rr_headers"
- RRPipeline string = "rr_pipeline"
- RRTimeout string = "rr_timeout"
- RRDelay string = "rr_delay"
- RRPriority string = "rr_priority"
- RRMaxAttempts string = "rr_max_attempts"
+ RRID string = "rr_id"
+ RRJob string = "rr_job"
+ RRHeaders string = "rr_headers"
+ RRPipeline string = "rr_pipeline"
+ RRTimeout string = "rr_timeout"
+ RRDelay string = "rr_delay"
+ RRPriority string = "rr_priority"
)
// Job carries information about single job.
@@ -29,31 +28,3 @@ type Job struct {
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
-
-func (j Job) ID() string {
- panic("implement me")
-}
-
-func (j Job) Priority() int64 {
- panic("implement me")
-}
-
-func (j Job) Body() []byte {
- panic("implement me")
-}
-
-func (j Job) Context() ([]byte, error) {
- panic("implement me")
-}
-
-func (j Job) Ack() error {
- panic("implement me")
-}
-
-func (j Job) Nack() error {
- panic("implement me")
-}
-
-func (j Job) Requeue(delay uint32) error {
- panic("implement me")
-}
diff --git a/plugins/jobs/job/job_options.go b/plugins/jobs/job/job_options.go
index af971d15..133ae1a8 100644
--- a/plugins/jobs/job/job_options.go
+++ b/plugins/jobs/job/job_options.go
@@ -14,10 +14,6 @@ type Options struct {
// Delay defines time duration to delay execution for. Defaults to none.
Delay int64 `json:"delay,omitempty"`
- // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
- // Minimum valuable value is 2.
- Attempts int64 `json:"maxAttempts,omitempty"`
-
// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
RetryDelay int64 `json:"retryDelay,omitempty"`
@@ -31,10 +27,6 @@ func (o *Options) Merge(from *Options) {
o.Pipeline = from.Pipeline
}
- if o.Attempts == 0 {
- o.Attempts = from.Attempts
- }
-
if o.Timeout == 0 {
o.Timeout = from.Timeout
}
@@ -48,12 +40,6 @@ func (o *Options) Merge(from *Options) {
}
}
-// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt int64) bool {
- // Attempts 1 and 0 has identical effect
- return o.Attempts > (attempt + 1)
-}
-
// RetryDuration returns retry delay duration in a form of time.Duration.
func (o *Options) RetryDuration() time.Duration {
return time.Second * time.Duration(o.RetryDelay)
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
index e27f2868..691369d0 100644
--- a/plugins/jobs/protocol.go
+++ b/plugins/jobs/protocol.go
@@ -23,9 +23,10 @@ type protocol struct {
}
type errorResp struct {
- Msg string `json:"message"`
- Requeue bool `json:"requeue"`
- Delay int64 `json:"delay_seconds"`
+ Msg string `json:"message"`
+ Requeue bool `json:"requeue"`
+ Delay int64 `json:"delay_seconds"`
+ Headers map[string][]string `json:"headers"`
}
func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
@@ -57,7 +58,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
if er.Requeue {
- err = jb.Requeue(er.Delay)
+ err = jb.Requeue(er.Headers, er.Delay)
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md
index 577317d4..77c78cb8 100644
--- a/plugins/jobs/response_protocol.md
+++ b/plugins/jobs/response_protocol.md
@@ -15,7 +15,7 @@ Types are:
- `NO_ERROR`: contains only `type` and empty `data`.
- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the
job,
- `dalay_seconds`: to delay a queue for a provided amount of seconds.
+ `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap with string key and array of strings as a value.
For example:
@@ -40,7 +40,12 @@ For example:
"requeue": true,
"headers": [
{
- "test": "1",
+ "test": [
+ {
+ "ttt": "11",
+ "ggg": "22"
+ }
+ ],
"test2": "2"
}
],