summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-01 16:50:41 +0300
committerValery Piashchynski <[email protected]>2021-09-01 16:50:41 +0300
commitd62acca114a9646afed6ec0217b8cb709687aeb9 (patch)
tree3357194e05e6edbac46e2d85e4e98ef0d388480e /plugins/amqp/amqpjobs
parent5ad241b23b64faf7389c424bdecd3489338fa1ba (diff)
Close connection in the amqp driver.
bytes.Buffer update in the beanstalk driver Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/amqp/amqpjobs')
-rw-r--r--plugins/amqp/amqpjobs/consumer.go6
-rw-r--r--plugins/amqp/amqpjobs/item.go7
-rw-r--r--plugins/amqp/amqpjobs/redial.go21
3 files changed, 16 insertions, 18 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 784a102c..1bfc4b41 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -420,17 +420,17 @@ func (c *consumer) Resume(_ context.Context, p string) {
}
func (c *consumer) Stop(context.Context) error {
- if atomic.LoadUint32(&c.listeners) > 0 {
- c.stopCh <- struct{}{}
- }
+ c.stopCh <- struct{}{}
pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
})
+
return nil
}
diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
index 04385afe..b837ff86 100644
--- a/plugins/amqp/amqpjobs/item.go
+++ b/plugins/amqp/amqpjobs/item.go
@@ -43,17 +43,18 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
// private
- // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
+ // ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
ack func(multiply bool) error
- // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
+ // nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
// When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
// When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
// requeueFn used as a pointer to the push function
- requeueFn func(context.Context, *Item) error
+ requeueFn func(context.Context, *Item) error
+ // delayed jobs TODO(rustatian): figure out how to get stats from the DLX
delayed *int64
multipleAsk bool
requeue bool
diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
index 8d21784f..698a34a6 100644
--- a/plugins/amqp/amqpjobs/redial.go
+++ b/plugins/amqp/amqpjobs/redial.go
@@ -113,25 +113,22 @@ func (c *consumer) redialer() { //nolint:gocognit
c.Unlock()
case <-c.stopCh:
- if c.publishChan != nil {
- pch := <-c.publishChan
- err := pch.Close()
- if err != nil {
- c.log.Error("publish channel close", "error", err)
- }
+ pch := <-c.publishChan
+ err := pch.Close()
+ if err != nil {
+ c.log.Error("publish channel close", "error", err)
}
if c.consumeChan != nil {
- err := c.consumeChan.Close()
+ err = c.consumeChan.Close()
if err != nil {
c.log.Error("consume channel close", "error", err)
}
}
- if c.conn != nil {
- err := c.conn.Close()
- if err != nil {
- c.log.Error("amqp connection close", "error", err)
- }
+
+ err = c.conn.Close()
+ if err != nil {
+ c.log.Error("amqp connection close", "error", err)
}
return