summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-31 17:20:33 +0300
committerGitHub <[email protected]>2021-08-31 17:20:33 +0300
commit0437d1f58514f694ea86e8176e621c009cd510f9 (patch)
treeaf0ea88580e852832d172e34b9698b1c87c98110
parent83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (diff)
parent3ea498a1bf0dc29c8273b01417ff8d17ad5e97c3 (diff)
#776: fix(amqp): type casting for the amqp headersv2.4.0-rc.1
#776: fix(amqp): type casting for the amqp headers
-rw-r--r--CHANGELOG.md5
-rw-r--r--plugins/amqp/amqpjobs/item.go18
2 files changed, 18 insertions, 5 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 93bcf13b..fcec90cb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,7 +12,10 @@ v2.4.0 (_.08.2021)
- ✏️ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime.
Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `ephemeral` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726)
-- Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`, `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2)
+- ✏️ Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`, `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2)
+- ✏️ Support for the Docker images via GitHub packages.
+- ✏️ Go 1.17 support.
+
## 🩹 Fixes:
- 🐛 Fix: fixed bug with goroutines waiting on the internal worker's container channel, [issue](https://github.com/spiral/roadrunner/issues/750).
diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
index 66b70a36..04385afe 100644
--- a/plugins/amqp/amqpjobs/item.go
+++ b/plugins/amqp/amqpjobs/item.go
@@ -224,15 +224,25 @@ func (c *consumer) unpack(d amqp.Delivery) (*Item, error) {
}
}
- if _, ok := d.Headers[job.RRDelay].(int64); ok {
- item.Options.Delay = d.Headers[job.RRDelay].(int64)
+ if t, ok := d.Headers[job.RRDelay]; ok {
+ switch t.(type) {
+ case int, int16, int32, int64:
+ item.Options.Delay = t.(int64)
+ default:
+ c.log.Warn("unknown delay type", "want:", "int, int16, int32, int64", "actual", t)
+ }
}
- if _, ok := d.Headers[job.RRPriority]; !ok {
+ if t, ok := d.Headers[job.RRPriority]; !ok {
// set pipe's priority
item.Options.Priority = c.priority
} else {
- item.Options.Priority = d.Headers[job.RRPriority].(int64)
+ switch t.(type) {
+ case int, int16, int32, int64:
+ item.Options.Priority = t.(int64)
+ default:
+ c.log.Warn("unknown priority type", "want:", "int, int16, int32, int64", "actual", t)
+ }
}
return item, nil