diff options
-rw-r--r-- | common/jobs/interface.go | 3 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 115 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 101 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/rabbit.go | 9 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 6 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/item.go | 31 |
6 files changed, 140 insertions, 125 deletions
diff --git a/common/jobs/interface.go b/common/jobs/interface.go index 3c29447d..a75ba760 100644 --- a/common/jobs/interface.go +++ b/common/jobs/interface.go @@ -11,7 +11,8 @@ type Consumer interface { Push(job *structs.Job) error Register(pipeline *pipeline.Pipeline) error Consume(pipeline *pipeline.Pipeline) error - List() []*pipeline.Pipeline + // List of the pipelines + List() []string Pause(pipeline string) Resume(pipeline string) diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index b4e35d35..ccf6b2ea 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -143,68 +143,69 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, } func (j *JobsConsumer) Push(job *structs.Job) error { - const op = errors.Op("ephemeral_push") - // lock needed here to re-create a connections and channels in case of error + const op = errors.Op("rabbitmq_push") + // check if the pipeline registered + if _, ok := j.pipelines.Load(job.Options.Pipeline); !ok { + return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) + } + + // lock needed here to protect redial concurrent operation + // we may be in the redial state here j.RLock() defer j.RUnlock() // convert msg := FromJob(job) - // check if the pipeline registered - if _, ok := j.pipelines.Load(job.Options.Pipeline); ok { - // handle timeouts - if job.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - - delayMs := int64(job.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - - _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - - if err != nil { - panic(err) - } - - err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - panic(err) - } - - // insert to the local, limited pipeline - err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: pack(job.Ident, 0, msg), - ContentType: contentType, - Timestamp: time.Now(), - Body: nil, - }) - if err != nil { - panic(err) - } - - return nil + // handle timeouts + if job.Options.DelayDuration() > 0 { + // TODO declare separate method for this if condition + delayMs := int64(job.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + + _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + + if err != nil { + return errors.E(op, err) + } + + err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + return errors.E(op, err) } // insert to the local, limited pipeline - err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: pack(job.Ident, 0, msg), + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: pack(job.Ident, msg), ContentType: contentType, Timestamp: time.Now(), - Body: nil, + Body: msg.Body(), }) + if err != nil { - panic(err) + return errors.E(op, err) } return nil } - return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) + // insert to the local, limited pipeline + err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: pack(job.Ident, msg), + ContentType: contentType, + Timestamp: time.Now(), + Body: msg.Body(), + }) + if err != nil { + return errors.E(op, err) + } + + return nil } func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { @@ -220,11 +221,14 @@ func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") - if _, ok := j.pipelines.Load(pipeline.Name()); !ok { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name())) } + // protect connection (redial) + j.Lock() + defer j.Unlock() + var err error j.consumeChan, err = j.conn.Channel() if err != nil { @@ -256,8 +260,16 @@ func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error { return nil } -func (j *JobsConsumer) List() []*pipeline.Pipeline { - panic("implement me") +func (j *JobsConsumer) List() []string { + out := make([]string, 0, 2) + + j.pipelines.Range(func(key, value interface{}) bool { + pipe := key.(string) + out = append(out, pipe) + return true + }) + + return out } func (j *JobsConsumer) Pause(pipeline string) { @@ -268,6 +280,10 @@ func (j *JobsConsumer) Pause(pipeline string) { } } + // protect connection (redial) + j.Lock() + defer j.Unlock() + err := j.publishChan.Cancel(j.consumeID, true) if err != nil { j.logger.Error("cancel publish channel, forcing close", "error", err) @@ -284,6 +300,11 @@ func (j *JobsConsumer) Resume(pipeline string) { // mark pipeline as turned off j.pipelines.Store(pipeline, true) } + + // protect connection (redial) + j.Lock() + defer j.Unlock() + var err error j.consumeChan, err = j.conn.Channel() if err != nil { diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index 190e72e8..e5e580e0 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -5,25 +5,27 @@ import ( "time" json "github.com/json-iterator/go" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/utils" "github.com/streadway/amqp" ) const ( - rrID string = "rr-id" - rrJob string = "rr-job" - rrAttempt string = "rr-attempt" - rrMaxAttempts string = "rr-max_attempts" - rrTimeout string = "rr-timeout" - rrDelay string = "rr-delay" - rrRetryDelay string = "rr-retry_delay" + rrID string = "rr-id" + rrJob string = "rr-job" + // rrAttempt string = "rr-attempt" + // rrMaxAttempts string = "rr-max_attempts" + rrTimeout string = "rr-timeout" + rrDelay string = "rr-delay" + rrRetryDelay string = "rr-retry_delay" ) -func FromDelivery(d amqp.Delivery) *Item { - id, _, item, err := unpack(d) +func FromDelivery(d amqp.Delivery) (*Item, error) { + const op = errors.Op("from_delivery_convert") + id, item, err := unpack(d) if err != nil { - panic(err) + return nil, errors.E(op, err) } return &Item{ Job: item.Job, @@ -33,7 +35,7 @@ func FromDelivery(d amqp.Delivery) *Item { Options: item.Options, AckFunc: d.Ack, NackFunc: d.Nack, - } + }, nil } func FromJob(job *structs.Job) *Item { @@ -41,14 +43,17 @@ func FromJob(job *structs.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, - Options: conv(*job.Options), + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: int64(job.Options.Delay), + Attempts: int64(job.Options.Attempts), + RetryDelay: int64(job.Options.RetryDelay), + Timeout: int64(job.Options.Timeout), + }, } } -func conv(jo structs.Options) Options { - return Options(jo) -} - type Item struct { // Job contains pluginName of job broker (usually PHP class). Job string `json:"job"` @@ -63,7 +68,7 @@ type Item struct { Headers map[string][]string // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options Options `json:"options,omitempty"` + Options *Options `json:"options,omitempty"` // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery AckFunc func(multiply bool) error @@ -85,21 +90,21 @@ type Options struct { Pipeline string `json:"pipeline,omitempty"` // Delay defines time duration to delay execution for. Defaults to none. - Delay uint64 `json:"delay,omitempty"` + Delay int64 `json:"delay,omitempty"` // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). // Minimum valuable value is 2. - Attempts uint64 `json:"maxAttempts,omitempty"` + Attempts int64 `json:"maxAttempts,omitempty"` // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay uint64 `json:"retryDelay,omitempty"` + RetryDelay int64 `json:"retryDelay,omitempty"` // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. - Timeout uint64 `json:"timeout,omitempty"` + Timeout int64 `json:"timeout,omitempty"` } // CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt uint64) bool { +func (o *Options) CanRetry(attempt int64) bool { // Attempts 1 and 0 has identical effect return o.Attempts > (attempt + 1) } @@ -143,7 +148,7 @@ func (j *Item) Context() ([]byte, error) { ID string `json:"id"` Job string `json:"job"` Headers map[string][]string `json:"headers"` - Timeout uint64 `json:"timeout"` + Timeout int64 `json:"timeout"` Pipeline string `json:"pipeline"` }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, ) @@ -163,51 +168,51 @@ func (j *Item) Nack() error { } // pack job metadata into headers -func pack(id string, attempt uint64, j *Item) amqp.Table { +func pack(id string, j *Item) amqp.Table { return amqp.Table{ - rrID: id, - rrJob: j.Job, - rrAttempt: attempt, - rrMaxAttempts: j.Options.Attempts, - rrTimeout: j.Options.Timeout, - rrDelay: j.Options.Delay, - rrRetryDelay: j.Options.RetryDelay, + rrID: id, + rrJob: j.Job, + // rrAttempt: attempt, + // rrMaxAttempts: j.Options.Attempts, + rrTimeout: j.Options.Timeout, + rrDelay: j.Options.Delay, + rrRetryDelay: j.Options.RetryDelay, } } // unpack restores jobs.Options -func unpack(d amqp.Delivery) (id string, attempt int, j *Item, err error) { - j = &Item{Payload: string(d.Body), Options: Options{}} +func unpack(d amqp.Delivery) (id string, j *Item, err error) { + j = &Item{Payload: string(d.Body), Options: &Options{}} if _, ok := d.Headers[rrID].(string); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", rrID) + return "", nil, fmt.Errorf("missing header `%s`", rrID) } - if _, ok := d.Headers[rrAttempt].(uint64); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt) - } + // if _, ok := d.Headers[rrAttempt].(uint64); !ok { + // return "", 0, nil, fmt.Errorf("missing header `%s`", rrAttempt) + // } if _, ok := d.Headers[rrJob].(string); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", rrJob) + return "", nil, fmt.Errorf("missing header `%s`", rrJob) } j.Job = d.Headers[rrJob].(string) - if _, ok := d.Headers[rrMaxAttempts].(uint64); ok { - j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64) - } + // if _, ok := d.Headers[rrMaxAttempts].(uint64); ok { + // j.Options.Attempts = d.Headers[rrMaxAttempts].(uint64) + // } - if _, ok := d.Headers[rrTimeout].(uint64); ok { - j.Options.Timeout = d.Headers[rrTimeout].(uint64) + if _, ok := d.Headers[rrTimeout].(int64); ok { + j.Options.Timeout = d.Headers[rrTimeout].(int64) } - if _, ok := d.Headers[rrDelay].(uint64); ok { - j.Options.Delay = d.Headers[rrDelay].(uint64) + if _, ok := d.Headers[rrDelay].(int64); ok { + j.Options.Delay = d.Headers[rrDelay].(int64) } - if _, ok := d.Headers[rrRetryDelay].(uint64); ok { - j.Options.RetryDelay = d.Headers[rrRetryDelay].(uint64) + if _, ok := d.Headers[rrRetryDelay].(int64); ok { + j.Options.RetryDelay = d.Headers[rrRetryDelay].(int64) } - return d.Headers[rrID].(string), int(d.Headers[rrAttempt].(uint64)), j, nil + return d.Headers[rrID].(string), j, nil } diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go index 4d75dc0e..f21fe8de 100644 --- a/plugins/jobs/brokers/amqp/rabbit.go +++ b/plugins/jobs/brokers/amqp/rabbit.go @@ -72,8 +72,13 @@ func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { return } - // add task to the queue - j.pq.Insert(FromDelivery(msg)) + d, err := FromDelivery(msg) + if err != nil { + j.logger.Error("amqp delivery convert", "error", err) + continue + } + // insert job into the main priority queue + j.pq.Insert(d) case <-j.stop: return } diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 030dcae8..09e78249 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -121,11 +121,11 @@ func (j *JobBroker) Resume(pipeline string) { } } -func (j *JobBroker) List() []*pipeline.Pipeline { - out := make([]*pipeline.Pipeline, 0, 2) +func (j *JobBroker) List() []string { + out := make([]string, 0, 2) j.queues.Range(func(key, value interface{}) bool { - pipe := key.(*pipeline.Pipeline) + pipe := key.(string) out = append(out, pipe) return true }) diff --git a/plugins/jobs/brokers/ephemeral/item.go b/plugins/jobs/brokers/ephemeral/item.go index 76e83d00..211ac56a 100644 --- a/plugins/jobs/brokers/ephemeral/item.go +++ b/plugins/jobs/brokers/ephemeral/item.go @@ -13,14 +13,15 @@ func From(job *structs.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, - Options: conv(*job.Options), + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + }, } } -func conv(jo structs.Options) Options { - return Options(jo) -} - type Item struct { // Job contains name of job broker (usually PHP class). Job string `json:"job"` @@ -35,7 +36,7 @@ type Item struct { Headers map[string][]string // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options Options `json:"options,omitempty"` + Options *Options `json:"options,omitempty"` } // Options carry information about how to handle given job. @@ -50,28 +51,10 @@ type Options struct { // Delay defines time duration to delay execution for. Defaults to none. Delay uint64 `json:"delay,omitempty"` - // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). - // Minimum valuable value is 2. - Attempts uint64 `json:"maxAttempts,omitempty"` - - // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay uint64 `json:"retryDelay,omitempty"` - // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout uint64 `json:"timeout,omitempty"` } -// CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt uint64) bool { - // Attempts 1 and 0 has identical effect - return o.Attempts > (attempt + 1) -} - -// RetryDuration returns retry delay duration in a form of time.Duration. -func (o *Options) RetryDuration() time.Duration { - return time.Second * time.Duration(o.RetryDelay) -} - // DelayDuration returns delay duration in a form of time.Duration. func (o *Options) DelayDuration() time.Duration { return time.Second * time.Duration(o.Delay) |