summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-01 16:50:41 +0300
committerValery Piashchynski <[email protected]>2021-09-01 16:50:41 +0300
commitd62acca114a9646afed6ec0217b8cb709687aeb9 (patch)
tree3357194e05e6edbac46e2d85e4e98ef0d388480e /plugins/amqp/amqpjobs/consumer.go
parent5ad241b23b64faf7389c424bdecd3489338fa1ba (diff)
Close connection in the amqp driver.
bytes.Buffer update in the beanstalk driver Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/amqp/amqpjobs/consumer.go')
-rw-r--r--plugins/amqp/amqpjobs/consumer.go6
1 files changed, 3 insertions, 3 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 784a102c..1bfc4b41 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -420,17 +420,17 @@ func (c *consumer) Resume(_ context.Context, p string) {
}
func (c *consumer) Stop(context.Context) error {
- if atomic.LoadUint32(&c.listeners) > 0 {
- c.stopCh <- struct{}{}
- }
+ c.stopCh <- struct{}{}
pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
})
+
return nil
}