diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 22 |
1 files changed, 15 insertions, 7 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 3f3fa196..f411e9a0 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -455,7 +455,6 @@ func (p *Plugin) Push(j *job.Job) error { } // if job has no priority, inherit it from the pipeline - // TODO(rustatian) merge all options, not only priority if j.Options.Priority == 0 { j.Options.Priority = ppl.Priority() } @@ -470,9 +469,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return errors.E(op, err) } @@ -482,9 +481,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return nil @@ -492,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error { func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() for i := 0; i < len(j); i++ { - start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -616,6 +615,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // save the pipeline p.pipelines.Store(pipeline.Name(), pipeline) + p.log.Debug("pipeline declared", "driver", pipeline.Driver(), "name", pipeline.Name()) return nil } @@ -638,11 +638,19 @@ func (p *Plugin) Destroy(pp string) error { // delete consumer delete(p.consumers, ppl.Name()) - p.pipelines.Delete(pp) + // delete old pipeline + p.pipelines.LoadAndDelete(pp) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() + err := d.Stop(ctx) + if err != nil { + cancel() + return errors.E(op, err) + } - return d.Stop(ctx) + d = nil + cancel() + return nil } func (p *Plugin) List() []string { |