From 96d437f96785ada6aa5eb6d6ec9505b977ab3e74 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 10 Jul 2021 11:17:29 +0300 Subject: Update Consumer interface, List method returns []string of the pipelines. Update packing and unpacking Signed-off-by: Valery Piashchynski --- plugins/jobs/brokers/ephemeral/consumer.go | 6 +++--- plugins/jobs/brokers/ephemeral/item.go | 31 +++++++----------------------- 2 files changed, 10 insertions(+), 27 deletions(-) (limited to 'plugins/jobs/brokers/ephemeral') diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 030dcae8..09e78249 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -121,11 +121,11 @@ func (j *JobBroker) Resume(pipeline string) { } } -func (j *JobBroker) List() []*pipeline.Pipeline { - out := make([]*pipeline.Pipeline, 0, 2) +func (j *JobBroker) List() []string { + out := make([]string, 0, 2) j.queues.Range(func(key, value interface{}) bool { - pipe := key.(*pipeline.Pipeline) + pipe := key.(string) out = append(out, pipe) return true }) diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go index 76e83d00..211ac56a 100644 --- a/plugins/jobs/brokers/ephemeral/item.go +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -13,14 +13,15 @@ func From(job *structs.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, - Options: conv(*job.Options), + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + }, } } -func conv(jo structs.Options) Options { - return Options(jo) -} - type Item struct { // Job contains name of job broker (usually PHP class). Job string `json:"job"` @@ -35,7 +36,7 @@ type Item struct { Headers map[string][]string // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options Options `json:"options,omitempty"` + Options *Options `json:"options,omitempty"` } // Options carry information about how to handle given job. @@ -50,28 +51,10 @@ type Options struct { // Delay defines time duration to delay execution for. Defaults to none. Delay uint64 `json:"delay,omitempty"` - // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). - // Minimum valuable value is 2. - Attempts uint64 `json:"maxAttempts,omitempty"` - - // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay uint64 `json:"retryDelay,omitempty"` - // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout uint64 `json:"timeout,omitempty"` } -// CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt uint64) bool { - // Attempts 1 and 0 has identical effect - return o.Attempts > (attempt + 1) -} - -// RetryDuration returns retry delay duration in a form of time.Duration. -func (o *Options) RetryDuration() time.Duration { - return time.Second * time.Duration(o.RetryDelay) -} - // DelayDuration returns delay duration in a form of time.Duration. func (o *Options) DelayDuration() time.Duration { return time.Second * time.Duration(o.Delay) -- cgit v1.2.3