diff options
Diffstat (limited to 'plugins/amqp/amqpjobs/listener.go')
-rw-r--r-- | plugins/amqp/amqpjobs/listener.go | 25 |
1 files changed, 25 insertions, 0 deletions
diff --git a/plugins/amqp/amqpjobs/listener.go b/plugins/amqp/amqpjobs/listener.go new file mode 100644 index 00000000..0156d55c --- /dev/null +++ b/plugins/amqp/amqpjobs/listener.go @@ -0,0 +1,25 @@ +package amqpjobs + +import amqp "github.com/rabbitmq/amqp091-go" + +func (j *consumer) listener(deliv <-chan amqp.Delivery) { + go func() { + for { //nolint:gosimple + select { + case msg, ok := <-deliv: + if !ok { + j.log.Info("delivery channel closed, leaving the rabbit listener") + return + } + + d, err := j.fromDelivery(msg) + if err != nil { + j.log.Error("amqp delivery convert", "error", err) + continue + } + // insert job into the main priority queue + j.pq.Insert(d) + } + } + }() +} |