summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/ephemeral
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-10 11:17:29 +0300
committerValery Piashchynski <[email protected]>2021-07-10 11:17:29 +0300
commit96d437f96785ada6aa5eb6d6ec9505b977ab3e74 (patch)
treee413299670acfe94d983029f9b9137571d925b10 /plugins/jobs/brokers/ephemeral
parentfa57fa609d14e4ebf4cbffc154804402906eecaa (diff)
Update Consumer interface, List method returns []string of the
pipelines. Update packing and unpacking Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/ephemeral')
-rw-r--r--plugins/jobs/brokers/ephemeral/consumer.go6
-rw-r--r--plugins/jobs/brokers/ephemeral/item.go31
2 files changed, 10 insertions, 27 deletions
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go
index 030dcae8..09e78249 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/brokers/ephemeral/consumer.go
@@ -121,11 +121,11 @@ func (j *JobBroker) Resume(pipeline string) {
}
}
-func (j *JobBroker) List() []*pipeline.Pipeline {
- out := make([]*pipeline.Pipeline, 0, 2)
+func (j *JobBroker) List() []string {
+ out := make([]string, 0, 2)
j.queues.Range(func(key, value interface{}) bool {
- pipe := key.(*pipeline.Pipeline)
+ pipe := key.(string)
out = append(out, pipe)
return true
})
diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go
index 76e83d00..211ac56a 100644
--- a/plugins/jobs/brokers/ephemeral/item.go
+++ b/plugins/jobs/brokers/ephemeral/item.go
@@ -13,14 +13,15 @@ func From(job *structs.Job) *Item {
Job: job.Job,
Ident: job.Ident,
Payload: job.Payload,
- Options: conv(*job.Options),
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ },
}
}
-func conv(jo structs.Options) Options {
- return Options(jo)
-}
-
type Item struct {
// Job contains name of job broker (usually PHP class).
Job string `json:"job"`
@@ -35,7 +36,7 @@ type Item struct {
Headers map[string][]string
// Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options Options `json:"options,omitempty"`
+ Options *Options `json:"options,omitempty"`
}
// Options carry information about how to handle given job.
@@ -50,28 +51,10 @@ type Options struct {
// Delay defines time duration to delay execution for. Defaults to none.
Delay uint64 `json:"delay,omitempty"`
- // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
- // Minimum valuable value is 2.
- Attempts uint64 `json:"maxAttempts,omitempty"`
-
- // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay uint64 `json:"retryDelay,omitempty"`
-
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout uint64 `json:"timeout,omitempty"`
}
-// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt uint64) bool {
- // Attempts 1 and 0 has identical effect
- return o.Attempts > (attempt + 1)
-}
-
-// RetryDuration returns retry delay duration in a form of time.Duration.
-func (o *Options) RetryDuration() time.Duration {
- return time.Second * time.Duration(o.RetryDelay)
-}
-
// DelayDuration returns delay duration in a form of time.Duration.
func (o *Options) DelayDuration() time.Duration {
return time.Second * time.Duration(o.Delay)