summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs/listener.go
blob: 75c61cad947a6eadfc26c9ce4706dfc72e4f1538 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package amqpjobs

import amqp "github.com/rabbitmq/amqp091-go"

func (c *consumer) listener(deliv <-chan amqp.Delivery) {
	go func() {
		for { //nolint:gosimple
			select {
			case msg, ok := <-deliv:
				if !ok {
					c.log.Info("delivery channel closed, leaving the rabbit listener")
					return
				}

				d, err := c.fromDelivery(msg)
				if err != nil {
					c.log.Error("amqp delivery convert", "error", err)
					continue
				}
				// insert job into the main priority queue
				c.pq.Insert(d)
			}
		}
	}()
}