diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 57 |
1 files changed, 28 insertions, 29 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index d761de79..e118f732 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -386,44 +386,41 @@ func (p *Plugin) PushBatch(j []*job.Job) error { return nil } -func (p *Plugin) Pause(pipelines []string) { - for i := 0; i < len(pipelines); i++ { - pipe, ok := p.pipelines.Load(pipelines[i]) - if !ok { - p.log.Error("no such pipeline", "requested", pipelines[i]) - } +func (p *Plugin) Pause(pp string) { + pipe, ok := p.pipelines.Load(pp) - ppl := pipe.(*pipeline.Pipeline) + if !ok { + p.log.Error("no such pipeline", "requested", pp) + } - d, ok := p.consumers[ppl.Name()] - if !ok { - p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) - return - } + ppl := pipe.(*pipeline.Pipeline) - // redirect call to the underlying driver - d.Pause(ppl.Name()) + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pp) + return } -} -func (p *Plugin) Resume(pipelines []string) { - for i := 0; i < len(pipelines); i++ { - pipe, ok := p.pipelines.Load(pipelines[i]) - if !ok { - p.log.Error("no such pipeline", "requested", pipelines[i]) - } + // redirect call to the underlying driver + d.Pause(ppl.Name()) +} - ppl := pipe.(*pipeline.Pipeline) +func (p *Plugin) Resume(pp string) { + pipe, ok := p.pipelines.Load(pp) + if !ok { + p.log.Error("no such pipeline", "requested", pp) + } - d, ok := p.consumers[ppl.Name()] - if !ok { - p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) - return - } + ppl := pipe.(*pipeline.Pipeline) - // redirect call to the underlying driver - d.Resume(ppl.Name()) + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pp) + return } + + // redirect call to the underlying driver + d.Resume(ppl.Name()) } // Declare a pipeline. @@ -514,6 +511,8 @@ func (p *Plugin) RPC() interface{} { func (p *Plugin) collectJobsEvents(event interface{}) { if jev, ok := event.(events.JobEvent); ok { switch jev.Event { + case events.EventPipePaused: + p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobStart: p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobOK: |