diff options
author | Valery Piashchynski <[email protected]> | 2021-08-29 23:46:11 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-29 23:46:11 +0300 |
commit | c23a88a943b53b99d112b63ed121931d1f79436f (patch) | |
tree | 5373bb61fec4ceb5db041f7207cec7ef115388d1 /plugins/jobs | |
parent | 22e17a99fe2087f9c11a438e877afbac0096c052 (diff) |
Implement Init, FromPipeline methods
Update receiver in the amqp driver
Add simple (initial) boltdb tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/job/job.go (renamed from plugins/jobs/job/general.go) | 33 | ||||
-rw-r--r-- | plugins/jobs/job/job_options.go | 32 | ||||
-rw-r--r-- | plugins/jobs/job/job_test.go (renamed from plugins/jobs/job/job_options_test.go) | 0 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 2 |
4 files changed, 34 insertions, 33 deletions
diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/job.go index 390f44b5..06c3254e 100644 --- a/plugins/jobs/job/general.go +++ b/plugins/jobs/job/job.go @@ -1,5 +1,9 @@ package job +import ( + "time" +) + // constant keys to pack/unpack messages from different drivers const ( RRID string = "rr_id" @@ -27,3 +31,32 @@ type Job struct { // Options contains set of PipelineOptions specific to job execution. Can be empty. Options *Options `json:"options,omitempty"` } + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` +} + +// Merge merges job options. +func (o *Options) Merge(from *Options) { + if o.Pipeline == "" { + o.Pipeline = from.Pipeline + } + + if o.Delay == 0 { + o.Delay = from.Delay + } +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} diff --git a/plugins/jobs/job/job_options.go b/plugins/jobs/job/job_options.go deleted file mode 100644 index b7e4ed36..00000000 --- a/plugins/jobs/job/job_options.go +++ /dev/null @@ -1,32 +0,0 @@ -package job - -import "time" - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` -} - -// Merge merges job options. -func (o *Options) Merge(from *Options) { - if o.Pipeline == "" { - o.Pipeline = from.Pipeline - } - - if o.Delay == 0 { - o.Delay = from.Delay - } -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_test.go index a47151a3..a47151a3 100644 --- a/plugins/jobs/job/job_options_test.go +++ b/plugins/jobs/job/job_test.go diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 5e62c5c5..a0b477f9 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -178,7 +178,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit }) var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"}) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}) if err != nil { errCh <- err return errCh |