summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 18:05:31 +0300
committerValery Piashchynski <[email protected]>2021-07-22 18:05:31 +0300
commite9713a1d08a93e2be70c889c600ed89f54822b54 (patch)
tree4198126207cc497453bf666c7411a67b8a4c39a2 /plugins/jobs/drivers/amqp/consumer.go
parent83b246c68ea1594de2462c4ada3498babae906fb (diff)
Fix AMQP bugs, add more amqp tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go7
1 files changed, 5 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index d592a17a..f6442b42 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -384,7 +384,7 @@ func (j *JobsConsumer) Pause(p string) {
}
j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
+ Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
@@ -404,7 +404,7 @@ func (j *JobsConsumer) Resume(p string) {
l := atomic.LoadUint32(&j.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("sqs listener already in the active state")
+ j.log.Warn("amqp listener already in the active state")
return
}
@@ -439,6 +439,9 @@ func (j *JobsConsumer) Resume(p string) {
// run listener
j.listener(deliv)
+ // increase number of listeners
+ atomic.AddUint32(&j.listeners, 1)
+
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),