summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-10 01:38:36 +0300
committerValery Piashchynski <[email protected]>2021-07-10 01:39:09 +0300
commitfa57fa609d14e4ebf4cbffc154804402906eecaa (patch)
tree22bb3ab2c1f9ea9bd8f54f76d471f9bfc556ff75
parent242022e4a64acef9eda87fa1b8b3b026d693b332 (diff)
Properly parse amqp items (jobs).
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go15
-rw-r--r--plugins/jobs/brokers/amqp/headers.go68
-rw-r--r--plugins/jobs/brokers/amqp/item.go86
-rw-r--r--plugins/jobs/brokers/amqp/rabbit.go2
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml9
5 files changed, 105 insertions, 75 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 9ac47269..b4e35d35 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -144,22 +144,27 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
func (j *JobsConsumer) Push(job *structs.Job) error {
const op = errors.Op("ephemeral_push")
+ // lock needed here to re-create a connections and channels in case of error
j.RLock()
defer j.RUnlock()
+ // convert
+ msg := FromJob(job)
+
// check if the pipeline registered
if _, ok := j.pipelines.Load(job.Options.Pipeline); ok {
// handle timeouts
if job.Options.DelayDuration() > 0 {
- // pub
+ // TODO declare separate method for this if condition
+
delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
_, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
dlx: j.exchangeName,
dlxRoutingKey: j.routingKey,
- dlxTTL: 100,
- dlxExpires: 200,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
})
if err != nil {
@@ -173,7 +178,7 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: pack(job.Ident, 0, job),
+ Headers: pack(job.Ident, 0, msg),
ContentType: contentType,
Timestamp: time.Now(),
Body: nil,
@@ -187,7 +192,7 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
// insert to the local, limited pipeline
err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- //Headers: pack(job.Ident, 0, job),
+ Headers: pack(job.Ident, 0, msg),
ContentType: contentType,
Timestamp: time.Now(),
Body: nil,
diff --git a/plugins/jobs/brokers/amqp/headers.go b/plugins/jobs/brokers/amqp/headers.go
deleted file mode 100644
index b1f9c89d..00000000
--- a/plugins/jobs/brokers/amqp/headers.go
+++ /dev/null
@@ -1,68 +0,0 @@
-package amqp
-
-import (
- "fmt"
-
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
- "github.com/streadway/amqp"
-)
-
-const (
- rrID string = "rr-id"
- rrJob string = "rr-job"
- rrAttempt string = "rr-attempt"
- rrMaxAttempts string = "rr-max_attempts"
- rrTimeout string = "rr-timeout"
- rrDelay string = "rr-delay"
- rrRetryDelay string = "rr-retry_delay"
-)
-
-// pack job metadata into headers
-func pack(id string, attempt uint64, j *structs.Job) amqp.Table {
- return amqp.Table{
- rrID: id,
- rrJob: j.Job,
- rrAttempt: attempt,
- rrMaxAttempts: j.Options.Attempts,
- rrTimeout: j.Options.Timeout,
- rrDelay: j.Options.Delay,
- rrRetryDelay: j.Options.RetryDelay,
- }
-}
-
-// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (id string, attempt int, j *structs.Job, err error) { //nolint:deadcode,unused
- j = &structs.Job{Payload: string(d.Body), Options: &structs.Options{}}
-
- if _, ok := d.Headers[rrID].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", rrID)
- }
-
- if _, ok := d.Headers[rrAttempt].(uint64); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
- }
-
- if _, ok := d.Headers[rrJob].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob)
- }
-
- j.Job = d.Headers[rrJob].(string)
-
- if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
- j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
- }
-
- if _, ok := d.Headers[rrTimeout].(uint64); ok {
- j.Options.Timeout = d.Headers[rrTimeout].(uint64)
- }
-
- if _, ok := d.Headers[rrDelay].(uint64); ok {
- j.Options.Delay = d.Headers[rrDelay].(uint64)
- }
-
- if _, ok := d.Headers[rrRetryDelay].(uint64); ok {
- j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64)
- }
-
- return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil
-}
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index 4751df58..190e72e8 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -1,20 +1,54 @@
package amqp
import (
+ "fmt"
"time"
json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/utils"
"github.com/streadway/amqp"
)
-func From(d amqp.Delivery) *Item {
+const (
+ rrID string = "rr-id"
+ rrJob string = "rr-job"
+ rrAttempt string = "rr-attempt"
+ rrMaxAttempts string = "rr-max_attempts"
+ rrTimeout string = "rr-timeout"
+ rrDelay string = "rr-delay"
+ rrRetryDelay string = "rr-retry_delay"
+)
+
+func FromDelivery(d amqp.Delivery) *Item {
+ id, _, item, err := unpack(d)
+ if err != nil {
+ panic(err)
+ }
return &Item{
+ Job: item.Job,
+ Ident: id,
+ Payload: item.Payload,
+ Headers: item.Headers,
+ Options: item.Options,
AckFunc: d.Ack,
NackFunc: d.Nack,
}
}
+func FromJob(job *structs.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: conv(*job.Options),
+ }
+}
+
+func conv(jo structs.Options) Options {
+ return Options(jo)
+}
+
type Item struct {
// Job contains pluginName of job broker (usually PHP class).
Job string `json:"job"`
@@ -127,3 +161,53 @@ func (j *Item) Ack() error {
func (j *Item) Nack() error {
return j.NackFunc(false, false)
}
+
+// pack job metadata into headers
+func pack(id string, attempt uint64, j *Item) amqp.Table {
+ return amqp.Table{
+ rrID: id,
+ rrJob: j.Job,
+ rrAttempt: attempt,
+ rrMaxAttempts: j.Options.Attempts,
+ rrTimeout: j.Options.Timeout,
+ rrDelay: j.Options.Delay,
+ rrRetryDelay: j.Options.RetryDelay,
+ }
+}
+
+// unpack restores jobs.Options
+func unpack(d amqp.Delivery) (id string, attempt int, j *Item, err error) {
+ j = &Item{Payload: string(d.Body), Options: Options{}}
+
+ if _, ok := d.Headers[rrID].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrID)
+ }
+
+ if _, ok := d.Headers[rrAttempt].(uint64); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
+ }
+
+ if _, ok := d.Headers[rrJob].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob)
+ }
+
+ j.Job = d.Headers[rrJob].(string)
+
+ if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
+ j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
+ }
+
+ if _, ok := d.Headers[rrTimeout].(uint64); ok {
+ j.Options.Timeout = d.Headers[rrTimeout].(uint64)
+ }
+
+ if _, ok := d.Headers[rrDelay].(uint64); ok {
+ j.Options.Delay = d.Headers[rrDelay].(uint64)
+ }
+
+ if _, ok := d.Headers[rrRetryDelay].(uint64); ok {
+ j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64)
+ }
+
+ return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil
+}
diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go
index 7e722889..4d75dc0e 100644
--- a/plugins/jobs/brokers/amqp/rabbit.go
+++ b/plugins/jobs/brokers/amqp/rabbit.go
@@ -73,7 +73,7 @@ func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
}
// add task to the queue
- j.pq.Insert(From(msg))
+ j.pq.Insert(FromDelivery(msg))
case <-j.stop:
return
}
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index 37fce8e2..6ff2ab70 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -62,6 +62,15 @@ jobs:
exchange_type: direct
routing_key: test
+ test-2-amqp:
+ driver: amqp
+ priority: 2
+ pipeline_size: 1000
+ queue: test-2-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test-2
+
test-2:
driver: beanstalk
priority: 11