summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/requeue.go
blob: a2b3b26c32e0bdb475f0af7013ec4a3af48e4dc7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package amqp

// requeueListener should handle items passed to requeue
func (j *JobConsumer) requeueListener() {
	go func() {
		for { //nolint:gosimple
			select {
			case item, ok := <-j.requeueCh:
				if !ok {
					j.log.Info("requeue channel closed")
					return
				}

				pch := <-j.publishChan

				headers, err := pack(item.ID(), item)
				if err != nil {
					j.publishChan <- pch
					j.log.Error("requeue pack", "error", err)
					continue
				}

				err = j.handleItem(item, headers, pch)
				if err != nil {
					j.publishChan <- pch
					j.log.Error("requeue handle item", "error", err)
					continue
				}

				j.publishChan <- pch
			}
		}
	}()
}