summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/job/job.go11
-rw-r--r--plugins/jobs/job/job_test.go27
-rw-r--r--plugins/jobs/plugin.go22
3 files changed, 15 insertions, 45 deletions
diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go
index 06c3254e..adab2a0a 100644
--- a/plugins/jobs/job/job.go
+++ b/plugins/jobs/job/job.go
@@ -45,17 +45,6 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
}
-// Merge merges job options.
-func (o *Options) Merge(from *Options) {
- if o.Pipeline == "" {
- o.Pipeline = from.Pipeline
- }
-
- if o.Delay == 0 {
- o.Delay = from.Delay
- }
-}
-
// DelayDuration returns delay duration in a form of time.Duration.
func (o *Options) DelayDuration() time.Duration {
return time.Second * time.Duration(o.Delay)
diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go
index a47151a3..4a95e27d 100644
--- a/plugins/jobs/job/job_test.go
+++ b/plugins/jobs/job/job_test.go
@@ -16,30 +16,3 @@ func TestOptions_DelayDuration2(t *testing.T) {
opts := &Options{Delay: 1}
assert.Equal(t, time.Second, opts.DelayDuration())
}
-
-func TestOptions_Merge(t *testing.T) {
- opts := &Options{}
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- })
-
- assert.Equal(t, "pipeline", opts.Pipeline)
- assert.Equal(t, int64(2), opts.Delay)
-}
-
-func TestOptions_MergeKeepOriginal(t *testing.T) {
- opts := &Options{
- Pipeline: "default",
- Delay: 10,
- }
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- })
-
- assert.Equal(t, "default", opts.Pipeline)
- assert.Equal(t, int64(10), opts.Delay)
-}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 3f3fa196..f411e9a0 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -455,7 +455,6 @@ func (p *Plugin) Push(j *job.Job) error {
}
// if job has no priority, inherit it from the pipeline
- // TODO(rustatian) merge all options, not only priority
if j.Options.Priority == 0 {
j.Options.Priority = ppl.Priority()
}
@@ -470,9 +469,9 @@ func (p *Plugin) Push(j *job.Job) error {
ID: j.Ident,
Pipeline: ppl.Name(),
Driver: ppl.Driver(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
- Error: err,
})
return errors.E(op, err)
}
@@ -482,9 +481,9 @@ func (p *Plugin) Push(j *job.Job) error {
ID: j.Ident,
Pipeline: ppl.Name(),
Driver: ppl.Driver(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
- Error: err,
})
return nil
@@ -492,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error {
func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
+ start := time.Now()
for i := 0; i < len(j); i++ {
- start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
if !ok {
@@ -616,6 +615,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
// save the pipeline
p.pipelines.Store(pipeline.Name(), pipeline)
+ p.log.Debug("pipeline declared", "driver", pipeline.Driver(), "name", pipeline.Name())
return nil
}
@@ -638,11 +638,19 @@ func (p *Plugin) Destroy(pp string) error {
// delete consumer
delete(p.consumers, ppl.Name())
- p.pipelines.Delete(pp)
+ // delete old pipeline
+ p.pipelines.LoadAndDelete(pp)
+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- defer cancel()
+ err := d.Stop(ctx)
+ if err != nil {
+ cancel()
+ return errors.E(op, err)
+ }
- return d.Stop(ctx)
+ d = nil
+ cancel()
+ return nil
}
func (p *Plugin) List() []string {