diff options
author | Valery Piashchynski <[email protected]> | 2021-07-22 18:05:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-22 18:05:31 +0300 |
commit | e9713a1d08a93e2be70c889c600ed89f54822b54 (patch) | |
tree | 4198126207cc497453bf666c7411a67b8a4c39a2 /plugins | |
parent | 83b246c68ea1594de2462c4ada3498babae906fb (diff) |
Fix AMQP bugs, add more amqp tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 7 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 57 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 38 |
3 files changed, 58 insertions, 44 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index d592a17a..f6442b42 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -384,7 +384,7 @@ func (j *JobsConsumer) Pause(p string) { } j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, + Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), @@ -404,7 +404,7 @@ func (j *JobsConsumer) Resume(p string) { l := atomic.LoadUint32(&j.listeners) // no active listeners if l == 1 { - j.log.Warn("sqs listener already in the active state") + j.log.Warn("amqp listener already in the active state") return } @@ -439,6 +439,9 @@ func (j *JobsConsumer) Resume(p string) { // run listener j.listener(deliv) + // increase number of listeners + atomic.AddUint32(&j.listeners, 1) + j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index d761de79..e118f732 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -386,44 +386,41 @@ func (p *Plugin) PushBatch(j []*job.Job) error { return nil } -func (p *Plugin) Pause(pipelines []string) { - for i := 0; i < len(pipelines); i++ { - pipe, ok := p.pipelines.Load(pipelines[i]) - if !ok { - p.log.Error("no such pipeline", "requested", pipelines[i]) - } +func (p *Plugin) Pause(pp string) { + pipe, ok := p.pipelines.Load(pp) - ppl := pipe.(*pipeline.Pipeline) + if !ok { + p.log.Error("no such pipeline", "requested", pp) + } - d, ok := p.consumers[ppl.Name()] - if !ok { - p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) - return - } + ppl := pipe.(*pipeline.Pipeline) - // redirect call to the underlying driver - d.Pause(ppl.Name()) + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pp) + return } -} -func (p *Plugin) Resume(pipelines []string) { - for i := 0; i < len(pipelines); i++ { - pipe, ok := p.pipelines.Load(pipelines[i]) - if !ok { - p.log.Error("no such pipeline", "requested", pipelines[i]) - } + // redirect call to the underlying driver + d.Pause(ppl.Name()) +} - ppl := pipe.(*pipeline.Pipeline) +func (p *Plugin) Resume(pp string) { + pipe, ok := p.pipelines.Load(pp) + if !ok { + p.log.Error("no such pipeline", "requested", pp) + } - d, ok := p.consumers[ppl.Name()] - if !ok { - p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) - return - } + ppl := pipe.(*pipeline.Pipeline) - // redirect call to the underlying driver - d.Resume(ppl.Name()) + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pp) + return } + + // redirect call to the underlying driver + d.Resume(ppl.Name()) } // Declare a pipeline. @@ -514,6 +511,8 @@ func (p *Plugin) RPC() interface{} { func (p *Plugin) collectJobsEvents(event interface{}) { if jev, ok := event.(events.JobEvent); ok { switch jev.Event { + case events.EventPipePaused: + p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobStart: p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobOK: diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 10158e74..0d15fb0f 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -66,29 +66,23 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err return nil } -func (r *rpc) Pause(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { - pipelines := make([]string, len(req.GetPipelines())) - - for i := 0; i < len(pipelines); i++ { - pipelines[i] = req.GetPipelines()[i] +func (r *rpc) Pause(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error { + for i := 0; i < len(req.GetPipelines()); i++ { + r.p.Pause(req.GetPipelines()[i]) } - r.p.Pause(pipelines) return nil } -func (r *rpc) Resume(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error { - pipelines := make([]string, len(req.GetPipelines())) - - for i := 0; i < len(pipelines); i++ { - pipelines[i] = req.GetPipelines()[i] +func (r *rpc) Resume(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error { + for i := 0; i < len(req.GetPipelines()); i++ { + r.p.Resume(req.GetPipelines()[i]) } - r.p.Resume(pipelines) return nil } -func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Maintenance) error { +func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error { resp.Pipelines = r.p.List() return nil } @@ -114,6 +108,24 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error return nil } +func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { + const op = errors.Op("rcp_declare_pipeline") + + var destroyed []string + for i := 0; i < len(req.GetPipelines()); i++ { + err := r.p.Destroy(req.GetPipelines()[i]) + if err != nil { + return errors.E(op, err) + } + destroyed = append(destroyed, req.GetPipelines()[i]) + } + + // return destroyed pipelines + resp.Pipelines = destroyed + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} |