summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 18:05:31 +0300
committerValery Piashchynski <[email protected]>2021-07-22 18:05:31 +0300
commite9713a1d08a93e2be70c889c600ed89f54822b54 (patch)
tree4198126207cc497453bf666c7411a67b8a4c39a2 /plugins/jobs/plugin.go
parent83b246c68ea1594de2462c4ada3498babae906fb (diff)
Fix AMQP bugs, add more amqp tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go57
1 files changed, 28 insertions, 29 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index d761de79..e118f732 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -386,44 +386,41 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
return nil
}
-func (p *Plugin) Pause(pipelines []string) {
- for i := 0; i < len(pipelines); i++ {
- pipe, ok := p.pipelines.Load(pipelines[i])
- if !ok {
- p.log.Error("no such pipeline", "requested", pipelines[i])
- }
+func (p *Plugin) Pause(pp string) {
+ pipe, ok := p.pipelines.Load(pp)
- ppl := pipe.(*pipeline.Pipeline)
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pp)
+ }
- d, ok := p.consumers[ppl.Name()]
- if !ok {
- p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i])
- return
- }
+ ppl := pipe.(*pipeline.Pipeline)
- // redirect call to the underlying driver
- d.Pause(ppl.Name())
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pp)
+ return
}
-}
-func (p *Plugin) Resume(pipelines []string) {
- for i := 0; i < len(pipelines); i++ {
- pipe, ok := p.pipelines.Load(pipelines[i])
- if !ok {
- p.log.Error("no such pipeline", "requested", pipelines[i])
- }
+ // redirect call to the underlying driver
+ d.Pause(ppl.Name())
+}
- ppl := pipe.(*pipeline.Pipeline)
+func (p *Plugin) Resume(pp string) {
+ pipe, ok := p.pipelines.Load(pp)
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pp)
+ }
- d, ok := p.consumers[ppl.Name()]
- if !ok {
- p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i])
- return
- }
+ ppl := pipe.(*pipeline.Pipeline)
- // redirect call to the underlying driver
- d.Resume(ppl.Name())
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pp)
+ return
}
+
+ // redirect call to the underlying driver
+ d.Resume(ppl.Name())
}
// Declare a pipeline.
@@ -514,6 +511,8 @@ func (p *Plugin) RPC() interface{} {
func (p *Plugin) collectJobsEvents(event interface{}) {
if jev, ok := event.(events.JobEvent); ok {
switch jev.Event {
+ case events.EventPipePaused:
+ p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobStart:
p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobOK: