summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 18:05:31 +0300
committerValery Piashchynski <[email protected]>2021-07-22 18:05:31 +0300
commite9713a1d08a93e2be70c889c600ed89f54822b54 (patch)
tree4198126207cc497453bf666c7411a67b8a4c39a2 /plugins
parent83b246c68ea1594de2462c4ada3498babae906fb (diff)
Fix AMQP bugs, add more amqp tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go7
-rw-r--r--plugins/jobs/plugin.go57
-rw-r--r--plugins/jobs/rpc.go38
3 files changed, 58 insertions, 44 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index d592a17a..f6442b42 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -384,7 +384,7 @@ func (j *JobsConsumer) Pause(p string) {
}
j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
+ Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
@@ -404,7 +404,7 @@ func (j *JobsConsumer) Resume(p string) {
l := atomic.LoadUint32(&j.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("sqs listener already in the active state")
+ j.log.Warn("amqp listener already in the active state")
return
}
@@ -439,6 +439,9 @@ func (j *JobsConsumer) Resume(p string) {
// run listener
j.listener(deliv)
+ // increase number of listeners
+ atomic.AddUint32(&j.listeners, 1)
+
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index d761de79..e118f732 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -386,44 +386,41 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
return nil
}
-func (p *Plugin) Pause(pipelines []string) {
- for i := 0; i < len(pipelines); i++ {
- pipe, ok := p.pipelines.Load(pipelines[i])
- if !ok {
- p.log.Error("no such pipeline", "requested", pipelines[i])
- }
+func (p *Plugin) Pause(pp string) {
+ pipe, ok := p.pipelines.Load(pp)
- ppl := pipe.(*pipeline.Pipeline)
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pp)
+ }
- d, ok := p.consumers[ppl.Name()]
- if !ok {
- p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i])
- return
- }
+ ppl := pipe.(*pipeline.Pipeline)
- // redirect call to the underlying driver
- d.Pause(ppl.Name())
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pp)
+ return
}
-}
-func (p *Plugin) Resume(pipelines []string) {
- for i := 0; i < len(pipelines); i++ {
- pipe, ok := p.pipelines.Load(pipelines[i])
- if !ok {
- p.log.Error("no such pipeline", "requested", pipelines[i])
- }
+ // redirect call to the underlying driver
+ d.Pause(ppl.Name())
+}
- ppl := pipe.(*pipeline.Pipeline)
+func (p *Plugin) Resume(pp string) {
+ pipe, ok := p.pipelines.Load(pp)
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pp)
+ }
- d, ok := p.consumers[ppl.Name()]
- if !ok {
- p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i])
- return
- }
+ ppl := pipe.(*pipeline.Pipeline)
- // redirect call to the underlying driver
- d.Resume(ppl.Name())
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pp)
+ return
}
+
+ // redirect call to the underlying driver
+ d.Resume(ppl.Name())
}
// Declare a pipeline.
@@ -514,6 +511,8 @@ func (p *Plugin) RPC() interface{} {
func (p *Plugin) collectJobsEvents(event interface{}) {
if jev, ok := event.(events.JobEvent); ok {
switch jev.Event {
+ case events.EventPipePaused:
+ p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobStart:
p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobOK:
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 10158e74..0d15fb0f 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -66,29 +66,23 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err
return nil
}
-func (r *rpc) Pause(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error {
- pipelines := make([]string, len(req.GetPipelines()))
-
- for i := 0; i < len(pipelines); i++ {
- pipelines[i] = req.GetPipelines()[i]
+func (r *rpc) Pause(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error {
+ for i := 0; i < len(req.GetPipelines()); i++ {
+ r.p.Pause(req.GetPipelines()[i])
}
- r.p.Pause(pipelines)
return nil
}
-func (r *rpc) Resume(req *jobsv1beta.Maintenance, _ *jobsv1beta.Empty) error {
- pipelines := make([]string, len(req.GetPipelines()))
-
- for i := 0; i < len(pipelines); i++ {
- pipelines[i] = req.GetPipelines()[i]
+func (r *rpc) Resume(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error {
+ for i := 0; i < len(req.GetPipelines()); i++ {
+ r.p.Resume(req.GetPipelines()[i])
}
- r.p.Resume(pipelines)
return nil
}
-func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Maintenance) error {
+func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error {
resp.Pipelines = r.p.List()
return nil
}
@@ -114,6 +108,24 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error
return nil
}
+func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error {
+ const op = errors.Op("rcp_declare_pipeline")
+
+ var destroyed []string
+ for i := 0; i < len(req.GetPipelines()); i++ {
+ err := r.p.Destroy(req.GetPipelines()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
+ destroyed = append(destroyed, req.GetPipelines()[i])
+ }
+
+ // return destroyed pipelines
+ resp.Pipelines = destroyed
+
+ return nil
+}
+
// from converts from transport entity to domain
func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
headers := map[string][]string{}