summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-10 11:17:29 +0300
committerValery Piashchynski <[email protected]>2021-07-10 11:17:29 +0300
commit96d437f96785ada6aa5eb6d6ec9505b977ab3e74 (patch)
treee413299670acfe94d983029f9b9137571d925b10
parentfa57fa609d14e4ebf4cbffc154804402906eecaa (diff)
Update Consumer interface, List method returns []string of the
pipelines. Update packing and unpacking Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--common/jobs/interface.go3
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go115
-rw-r--r--plugins/jobs/brokers/amqp/item.go101
-rw-r--r--plugins/jobs/brokers/amqp/rabbit.go9
-rw-r--r--plugins/jobs/brokers/ephemeral/consumer.go6
-rw-r--r--plugins/jobs/brokers/ephemeral/item.go31
6 files changed, 140 insertions, 125 deletions
diff --git a/common/jobs/interface.go b/common/jobs/interface.go
index 3c29447d..a75ba760 100644
--- a/common/jobs/interface.go
+++ b/common/jobs/interface.go
@@ -11,7 +11,8 @@ type Consumer interface {
Push(job *structs.Job) error
Register(pipeline *pipeline.Pipeline) error
Consume(pipeline *pipeline.Pipeline) error
- List() []*pipeline.Pipeline
+ // List of the pipelines
+ List() []string
Pause(pipeline string)
Resume(pipeline string)
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index b4e35d35..ccf6b2ea 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -143,68 +143,69 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
}
func (j *JobsConsumer) Push(job *structs.Job) error {
- const op = errors.Op("ephemeral_push")
- // lock needed here to re-create a connections and channels in case of error
+ const op = errors.Op("rabbitmq_push")
+ // check if the pipeline registered
+ if _, ok := j.pipelines.Load(job.Options.Pipeline); !ok {
+ return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
+ }
+
+ // lock needed here to protect redial concurrent operation
+ // we may be in the redial state here
j.RLock()
defer j.RUnlock()
// convert
msg := FromJob(job)
- // check if the pipeline registered
- if _, ok := j.pipelines.Load(job.Options.Pipeline); ok {
- // handle timeouts
- if job.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
-
- delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
-
- _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
-
- if err != nil {
- panic(err)
- }
-
- err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- panic(err)
- }
-
- // insert to the local, limited pipeline
- err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: pack(job.Ident, 0, msg),
- ContentType: contentType,
- Timestamp: time.Now(),
- Body: nil,
- })
- if err != nil {
- panic(err)
- }
-
- return nil
+ // handle timeouts
+ if job.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+
+ _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
}
// insert to the local, limited pipeline
- err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: pack(job.Ident, 0, msg),
+ err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: pack(job.Ident, msg),
ContentType: contentType,
Timestamp: time.Now(),
- Body: nil,
+ Body: msg.Body(),
})
+
if err != nil {
- panic(err)
+ return errors.E(op, err)
}
return nil
}
- return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
+ // insert to the local, limited pipeline
+ err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: pack(job.Ident, msg),
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ Body: msg.Body(),
+ })
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
@@ -220,11 +221,14 @@ func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
-
if _, ok := j.pipelines.Load(pipeline.Name()); !ok {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name()))
}
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
var err error
j.consumeChan, err = j.conn.Channel()
if err != nil {
@@ -256,8 +260,16 @@ func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error {
return nil
}
-func (j *JobsConsumer) List() []*pipeline.Pipeline {
- panic("implement me")
+func (j *JobsConsumer) List() []string {
+ out := make([]string, 0, 2)
+
+ j.pipelines.Range(func(key, value interface{}) bool {
+ pipe := key.(string)
+ out = append(out, pipe)
+ return true
+ })
+
+ return out
}
func (j *JobsConsumer) Pause(pipeline string) {
@@ -268,6 +280,10 @@ func (j *JobsConsumer) Pause(pipeline string) {
}
}
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
err := j.publishChan.Cancel(j.consumeID, true)
if err != nil {
j.logger.Error("cancel publish channel, forcing close", "error", err)
@@ -284,6 +300,11 @@ func (j *JobsConsumer) Resume(pipeline string) {
// mark pipeline as turned off
j.pipelines.Store(pipeline, true)
}
+
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
var err error
j.consumeChan, err = j.conn.Channel()
if err != nil {
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index 190e72e8..e5e580e0 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -5,25 +5,27 @@ import (
"time"
json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/utils"
"github.com/streadway/amqp"
)
const (
- rrID string = "rr-id"
- rrJob string = "rr-job"
- rrAttempt string = "rr-attempt"
- rrMaxAttempts string = "rr-max_attempts"
- rrTimeout string = "rr-timeout"
- rrDelay string = "rr-delay"
- rrRetryDelay string = "rr-retry_delay"
+ rrID string = "rr-id"
+ rrJob string = "rr-job"
+ // rrAttempt string = "rr-attempt"
+ // rrMaxAttempts string = "rr-max_attempts"
+ rrTimeout string = "rr-timeout"
+ rrDelay string = "rr-delay"
+ rrRetryDelay string = "rr-retry_delay"
)
-func FromDelivery(d amqp.Delivery) *Item {
- id, _, item, err := unpack(d)
+func FromDelivery(d amqp.Delivery) (*Item, error) {
+ const op = errors.Op("from_delivery_convert")
+ id, item, err := unpack(d)
if err != nil {
- panic(err)
+ return nil, errors.E(op, err)
}
return &Item{
Job: item.Job,
@@ -33,7 +35,7 @@ func FromDelivery(d amqp.Delivery) *Item {
Options: item.Options,
AckFunc: d.Ack,
NackFunc: d.Nack,
- }
+ }, nil
}
func FromJob(job *structs.Job) *Item {
@@ -41,14 +43,17 @@ func FromJob(job *structs.Job) *Item {
Job: job.Job,
Ident: job.Ident,
Payload: job.Payload,
- Options: conv(*job.Options),
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: int64(job.Options.Delay),
+ Attempts: int64(job.Options.Attempts),
+ RetryDelay: int64(job.Options.RetryDelay),
+ Timeout: int64(job.Options.Timeout),
+ },
}
}
-func conv(jo structs.Options) Options {
- return Options(jo)
-}
-
type Item struct {
// Job contains pluginName of job broker (usually PHP class).
Job string `json:"job"`
@@ -63,7 +68,7 @@ type Item struct {
Headers map[string][]string
// Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options Options `json:"options,omitempty"`
+ Options *Options `json:"options,omitempty"`
// Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
AckFunc func(multiply bool) error
@@ -85,21 +90,21 @@ type Options struct {
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
- Delay uint64 `json:"delay,omitempty"`
+ Delay int64 `json:"delay,omitempty"`
// Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
// Minimum valuable value is 2.
- Attempts uint64 `json:"maxAttempts,omitempty"`
+ Attempts int64 `json:"maxAttempts,omitempty"`
// RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay uint64 `json:"retryDelay,omitempty"`
+ RetryDelay int64 `json:"retryDelay,omitempty"`
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout uint64 `json:"timeout,omitempty"`
+ Timeout int64 `json:"timeout,omitempty"`
}
// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt uint64) bool {
+func (o *Options) CanRetry(attempt int64) bool {
// Attempts 1 and 0 has identical effect
return o.Attempts > (attempt + 1)
}
@@ -143,7 +148,7 @@ func (j *Item) Context() ([]byte, error) {
ID string `json:"id"`
Job string `json:"job"`
Headers map[string][]string `json:"headers"`
- Timeout uint64 `json:"timeout"`
+ Timeout int64 `json:"timeout"`
Pipeline string `json:"pipeline"`
}{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
)
@@ -163,51 +168,51 @@ func (j *Item) Nack() error {
}
// pack job metadata into headers
-func pack(id string, attempt uint64, j *Item) amqp.Table {
+func pack(id string, j *Item) amqp.Table {
return amqp.Table{
- rrID: id,
- rrJob: j.Job,
- rrAttempt: attempt,
- rrMaxAttempts: j.Options.Attempts,
- rrTimeout: j.Options.Timeout,
- rrDelay: j.Options.Delay,
- rrRetryDelay: j.Options.RetryDelay,
+ rrID: id,
+ rrJob: j.Job,
+ // rrAttempt: attempt,
+ // rrMaxAttempts: j.Options.Attempts,
+ rrTimeout: j.Options.Timeout,
+ rrDelay: j.Options.Delay,
+ rrRetryDelay: j.Options.RetryDelay,
}
}
// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (id string, attempt int, j *Item, err error) {
- j = &Item{Payload: string(d.Body), Options: Options{}}
+func unpack(d amqp.Delivery) (id string, j *Item, err error) {
+ j = &Item{Payload: string(d.Body), Options: &Options{}}
if _, ok := d.Headers[rrID].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", rrID)
+ return "", nil, fmt.Errorf("missing header `%s`", rrID)
}
- if _, ok := d.Headers[rrAttempt].(uint64); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
- }
+ // if _, ok := d.Headers[rrAttempt].(uint64); !ok {
+ // return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt)
+ // }
if _, ok := d.Headers[rrJob].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob)
+ return "", nil, fmt.Errorf("missing header `%s`", rrJob)
}
j.Job = d.Headers[rrJob].(string)
- if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
- j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
- }
+ // if _, ok := d.Headers[rrMaxAttempts].(uint64); ok {
+ // j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64)
+ // }
- if _, ok := d.Headers[rrTimeout].(uint64); ok {
- j.Options.Timeout = d.Headers[rrTimeout].(uint64)
+ if _, ok := d.Headers[rrTimeout].(int64); ok {
+ j.Options.Timeout = d.Headers[rrTimeout].(int64)
}
- if _, ok := d.Headers[rrDelay].(uint64); ok {
- j.Options.Delay = d.Headers[rrDelay].(uint64)
+ if _, ok := d.Headers[rrDelay].(int64); ok {
+ j.Options.Delay = d.Headers[rrDelay].(int64)
}
- if _, ok := d.Headers[rrRetryDelay].(uint64); ok {
- j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64)
+ if _, ok := d.Headers[rrRetryDelay].(int64); ok {
+ j.Options.RetryDelay = d.Headers[rrRetryDelay].(int64)
}
- return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil
+ return d.Headers[rrID].(string), j, nil
}
diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go
index 4d75dc0e..f21fe8de 100644
--- a/plugins/jobs/brokers/amqp/rabbit.go
+++ b/plugins/jobs/brokers/amqp/rabbit.go
@@ -72,8 +72,13 @@ func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
return
}
- // add task to the queue
- j.pq.Insert(FromDelivery(msg))
+ d, err := FromDelivery(msg)
+ if err != nil {
+ j.logger.Error("amqp delivery convert", "error", err)
+ continue
+ }
+ // insert job into the main priority queue
+ j.pq.Insert(d)
case <-j.stop:
return
}
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go
index 030dcae8..09e78249 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/brokers/ephemeral/consumer.go
@@ -121,11 +121,11 @@ func (j *JobBroker) Resume(pipeline string) {
}
}
-func (j *JobBroker) List() []*pipeline.Pipeline {
- out := make([]*pipeline.Pipeline, 0, 2)
+func (j *JobBroker) List() []string {
+ out := make([]string, 0, 2)
j.queues.Range(func(key, value interface{}) bool {
- pipe := key.(*pipeline.Pipeline)
+ pipe := key.(string)
out = append(out, pipe)
return true
})
diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go
index 76e83d00..211ac56a 100644
--- a/plugins/jobs/brokers/ephemeral/item.go
+++ b/plugins/jobs/brokers/ephemeral/item.go
@@ -13,14 +13,15 @@ func From(job *structs.Job) *Item {
Job: job.Job,
Ident: job.Ident,
Payload: job.Payload,
- Options: conv(*job.Options),
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ },
}
}
-func conv(jo structs.Options) Options {
- return Options(jo)
-}
-
type Item struct {
// Job contains name of job broker (usually PHP class).
Job string `json:"job"`
@@ -35,7 +36,7 @@ type Item struct {
Headers map[string][]string
// Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options Options `json:"options,omitempty"`
+ Options *Options `json:"options,omitempty"`
}
// Options carry information about how to handle given job.
@@ -50,28 +51,10 @@ type Options struct {
// Delay defines time duration to delay execution for. Defaults to none.
Delay uint64 `json:"delay,omitempty"`
- // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
- // Minimum valuable value is 2.
- Attempts uint64 `json:"maxAttempts,omitempty"`
-
- // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay uint64 `json:"retryDelay,omitempty"`
-
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout uint64 `json:"timeout,omitempty"`
}
-// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt uint64) bool {
- // Attempts 1 and 0 has identical effect
- return o.Attempts > (attempt + 1)
-}
-
-// RetryDuration returns retry delay duration in a form of time.Duration.
-func (o *Options) RetryDuration() time.Duration {
- return time.Second * time.Duration(o.RetryDelay)
-}
-
// DelayDuration returns delay duration in a form of time.Duration.
func (o *Options) DelayDuration() time.Duration {
return time.Second * time.Duration(o.Delay)