From e9713a1d08a93e2be70c889c600ed89f54822b54 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 22 Jul 2021 18:05:31 +0300 Subject: Fix AMQP bugs, add more amqp tests Signed-off-by: Valery Piashchynski --- plugins/jobs/drivers/amqp/consumer.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'plugins/jobs/drivers/amqp') diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index d592a17a..f6442b42 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -384,7 +384,7 @@ func (j *JobsConsumer) Pause(p string) { } j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, + Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), @@ -404,7 +404,7 @@ func (j *JobsConsumer) Resume(p string) { l := atomic.LoadUint32(&j.listeners) // no active listeners if l == 1 { - j.log.Warn("sqs listener already in the active state") + j.log.Warn("amqp listener already in the active state") return } @@ -439,6 +439,9 @@ func (j *JobsConsumer) Resume(p string) { // run listener j.listener(deliv) + // increase number of listeners + atomic.AddUint32(&j.listeners, 1) + j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), -- cgit v1.2.3