diff options
author | Valery Piashchynski <[email protected]> | 2021-07-22 18:05:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-22 18:05:31 +0300 |
commit | e9713a1d08a93e2be70c889c600ed89f54822b54 (patch) | |
tree | 4198126207cc497453bf666c7411a67b8a4c39a2 /plugins/jobs/drivers/amqp | |
parent | 83b246c68ea1594de2462c4ada3498babae906fb (diff) |
Fix AMQP bugs, add more amqp tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index d592a17a..f6442b42 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -384,7 +384,7 @@ func (j *JobsConsumer) Pause(p string) { } j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, + Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), @@ -404,7 +404,7 @@ func (j *JobsConsumer) Resume(p string) { l := atomic.LoadUint32(&j.listeners) // no active listeners if l == 1 { - j.log.Warn("sqs listener already in the active state") + j.log.Warn("amqp listener already in the active state") return } @@ -439,6 +439,9 @@ func (j *JobsConsumer) Resume(p string) { // run listener j.listener(deliv) + // increase number of listeners + atomic.AddUint32(&j.listeners, 1) + j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), |