diff options
author | Valery Piashchynski <[email protected]> | 2021-07-10 11:17:29 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-10 11:17:29 +0300 |
commit | 96d437f96785ada6aa5eb6d6ec9505b977ab3e74 (patch) | |
tree | e413299670acfe94d983029f9b9137571d925b10 /plugins/jobs/brokers/ephemeral | |
parent | fa57fa609d14e4ebf4cbffc154804402906eecaa (diff) |
Update Consumer interface, List method returns []string of the
pipelines.
Update packing and unpacking
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/ephemeral')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 6 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/item.go | 31 |
2 files changed, 10 insertions, 27 deletions
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 030dcae8..09e78249 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -121,11 +121,11 @@ func (j *JobBroker) Resume(pipeline string) { } } -func (j *JobBroker) List() []*pipeline.Pipeline { - out := make([]*pipeline.Pipeline, 0, 2) +func (j *JobBroker) List() []string { + out := make([]string, 0, 2) j.queues.Range(func(key, value interface{}) bool { - pipe := key.(*pipeline.Pipeline) + pipe := key.(string) out = append(out, pipe) return true }) diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go index 76e83d00..211ac56a 100644 --- a/plugins/jobs/brokers/ephemeral/item.go +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -13,14 +13,15 @@ func From(job *structs.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, - Options: conv(*job.Options), + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + }, } } -func conv(jo structs.Options) Options { - return Options(jo) -} - type Item struct { // Job contains name of job broker (usually PHP class). Job string `json:"job"` @@ -35,7 +36,7 @@ type Item struct { Headers map[string][]string // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options Options `json:"options,omitempty"` + Options *Options `json:"options,omitempty"` } // Options carry information about how to handle given job. @@ -50,28 +51,10 @@ type Options struct { // Delay defines time duration to delay execution for. Defaults to none. Delay uint64 `json:"delay,omitempty"` - // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). - // Minimum valuable value is 2. - Attempts uint64 `json:"maxAttempts,omitempty"` - - // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay uint64 `json:"retryDelay,omitempty"` - // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout uint64 `json:"timeout,omitempty"` } -// CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt uint64) bool { - // Attempts 1 and 0 has identical effect - return o.Attempts > (attempt + 1) -} - -// RetryDuration returns retry delay duration in a form of time.Duration. -func (o *Options) RetryDuration() time.Duration { - return time.Second * time.Duration(o.RetryDelay) -} - // DelayDuration returns delay duration in a form of time.Duration. func (o *Options) DelayDuration() time.Duration { return time.Second * time.Duration(o.Delay) |