blob: a2b3b26c32e0bdb475f0af7013ec4a3af48e4dc7 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package amqp
// requeueListener should handle items passed to requeue
func (j *JobConsumer) requeueListener() {
go func() {
for { //nolint:gosimple
select {
case item, ok := <-j.requeueCh:
if !ok {
j.log.Info("requeue channel closed")
return
}
pch := <-j.publishChan
headers, err := pack(item.ID(), item)
if err != nil {
j.publishChan <- pch
j.log.Error("requeue pack", "error", err)
continue
}
err = j.handleItem(item, headers, pch)
if err != nil {
j.publishChan <- pch
j.log.Error("requeue handle item", "error", err)
continue
}
j.publishChan <- pch
}
}
}()
}
|