diff options
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/job/job.go | 11 | ||||
-rw-r--r-- | plugins/jobs/job/job_test.go | 27 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 22 |
3 files changed, 15 insertions, 45 deletions
diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go index 06c3254e..adab2a0a 100644 --- a/plugins/jobs/job/job.go +++ b/plugins/jobs/job/job.go @@ -45,17 +45,6 @@ type Options struct { Delay int64 `json:"delay,omitempty"` } -// Merge merges job options. -func (o *Options) Merge(from *Options) { - if o.Pipeline == "" { - o.Pipeline = from.Pipeline - } - - if o.Delay == 0 { - o.Delay = from.Delay - } -} - // DelayDuration returns delay duration in a form of time.Duration. func (o *Options) DelayDuration() time.Duration { return time.Second * time.Duration(o.Delay) diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go index a47151a3..4a95e27d 100644 --- a/plugins/jobs/job/job_test.go +++ b/plugins/jobs/job/job_test.go @@ -16,30 +16,3 @@ func TestOptions_DelayDuration2(t *testing.T) { opts := &Options{Delay: 1} assert.Equal(t, time.Second, opts.DelayDuration()) } - -func TestOptions_Merge(t *testing.T) { - opts := &Options{} - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - }) - - assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, int64(2), opts.Delay) -} - -func TestOptions_MergeKeepOriginal(t *testing.T) { - opts := &Options{ - Pipeline: "default", - Delay: 10, - } - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - }) - - assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, int64(10), opts.Delay) -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 3f3fa196..f411e9a0 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -455,7 +455,6 @@ func (p *Plugin) Push(j *job.Job) error { } // if job has no priority, inherit it from the pipeline - // TODO(rustatian) merge all options, not only priority if j.Options.Priority == 0 { j.Options.Priority = ppl.Priority() } @@ -470,9 +469,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return errors.E(op, err) } @@ -482,9 +481,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return nil @@ -492,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error { func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() for i := 0; i < len(j); i++ { - start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -616,6 +615,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // save the pipeline p.pipelines.Store(pipeline.Name(), pipeline) + p.log.Debug("pipeline declared", "driver", pipeline.Driver(), "name", pipeline.Name()) return nil } @@ -638,11 +638,19 @@ func (p *Plugin) Destroy(pp string) error { // delete consumer delete(p.consumers, ppl.Name()) - p.pipelines.Delete(pp) + // delete old pipeline + p.pipelines.LoadAndDelete(pp) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() + err := d.Stop(ctx) + if err != nil { + cancel() + return errors.E(op, err) + } - return d.Stop(ctx) + d = nil + cancel() + return nil } func (p *Plugin) List() []string { |