summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/amqp/amqpjobs/item.go')
-rw-r--r--plugins/amqp/amqpjobs/item.go18
1 files changed, 14 insertions, 4 deletions
diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
index 66b70a36..04385afe 100644
--- a/plugins/amqp/amqpjobs/item.go
+++ b/plugins/amqp/amqpjobs/item.go
@@ -224,15 +224,25 @@ func (c *consumer) unpack(d amqp.Delivery) (*Item, error) {
}
}
- if _, ok := d.Headers[job.RRDelay].(int64); ok {
- item.Options.Delay = d.Headers[job.RRDelay].(int64)
+ if t, ok := d.Headers[job.RRDelay]; ok {
+ switch t.(type) {
+ case int, int16, int32, int64:
+ item.Options.Delay = t.(int64)
+ default:
+ c.log.Warn("unknown delay type", "want:", "int, int16, int32, int64", "actual", t)
+ }
}
- if _, ok := d.Headers[job.RRPriority]; !ok {
+ if t, ok := d.Headers[job.RRPriority]; !ok {
// set pipe's priority
item.Options.Priority = c.priority
} else {
- item.Options.Priority = d.Headers[job.RRPriority].(int64)
+ switch t.(type) {
+ case int, int16, int32, int64:
+ item.Options.Priority = t.(int64)
+ default:
+ c.log.Warn("unknown priority type", "want:", "int, int16, int32, int64", "actual", t)
+ }
}
return item, nil