diff options
author | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
commit | 2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch) | |
tree | d796a11941fab4be668843a3fcbd83ea0859db39 /plugins | |
parent | e855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff) |
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 108 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 37 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/requeue.go | 34 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 18 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 27 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/requeue.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/item.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 9 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 30 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/requeue.go | 25 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 28 | ||||
-rw-r--r-- | plugins/jobs/protocol.go | 11 | ||||
-rw-r--r-- | plugins/jobs/response_protocol.md | 13 |
13 files changed, 155 insertions, 213 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index d7425858..429953e1 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -54,7 +54,6 @@ type JobConsumer struct { listeners uint32 stopCh chan struct{} - requeueCh chan *Item } // NewAMQPConsumer initializes rabbitmq pipeline @@ -112,7 +111,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, exclusive: pipeCfg.Exclusive, multipleAck: pipeCfg.MultipleAck, requeueOnFail: pipeCfg.RequeueOnFail, - requeueCh: make(chan *Item, 1000), } jb.conn, err = amqp.Dial(globalCfg.Addr) @@ -137,7 +135,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, // run redialer and requeue listener for the connection jb.redialer() - jb.requeueListener() return jb, nil } @@ -184,7 +181,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con exclusive: pipeline.Bool(exclusive, false), multipleAck: pipeline.Bool(multipleAsk, false), requeueOnFail: pipeline.Bool(requeueOnFail, false), - requeueCh: make(chan *Item, 1000), } jb.conn, err = amqp.Dial(globalCfg.Addr) @@ -213,7 +209,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // run redialer for the connection jb.redialer() - jb.requeueListener() return jb, nil } @@ -228,9 +223,17 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) } - // lock needed here to protect redial concurrent operation - // we may be in the redial state here + err := j.handleItem(ctx, fromJob(job)) + if err != nil { + return errors.E(op, err) + } + + return nil +} +// handleItem +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("rabbitmq_handle_item") select { case pch := <-j.publishChan: // return the channel back @@ -239,40 +242,35 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { }() // convert - msg := fromJob(job) - p, err := pack(job.Ident, msg) - if err != nil { - return errors.E(op, err) - } - - err = j.handleItem(msg, p, pch) + table, err := pack(msg.ID(), msg) if err != nil { return errors.E(op, err) } - return nil + const op = errors.Op("amqp_handle_item") + // handle timeouts + if msg.Options.DelayDuration() > 0 { + // TODO declare separate method for this if condition + // TODO dlx cache channel?? + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + if err != nil { + return errors.E(op, err) + } - case <-ctx.Done(): - return errors.E(op, errors.TimeOut, ctx.Err()) - } -} + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + return errors.E(op, err) + } -// handleItem -func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) error { - const op = errors.Op("amqp_handle_item") - // handle timeouts - if msg.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - - // delay cache optimization. - // If user already declared a queue with a delay, do not redeclare and rebind the queue - // Before -> 2.5k RPS with redeclaration - // After -> 30k RPS - if _, exists := j.delayCache[tmpQ]; exists { // insert to the local, limited pipeline - err := pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, Timestamp: time.Now().UTC(), @@ -284,29 +282,16 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) return errors.E(op, err) } - return nil - } - - _, err := pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - if err != nil { - return errors.E(op, err) - } + j.delayCache[tmpQ] = struct{}{} - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - return errors.E(op, err) + return nil } // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, - Timestamp: time.Now().UTC(), + Timestamp: time.Now(), DeliveryMode: amqp.Persistent, Body: msg.Body(), }) @@ -315,25 +300,10 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) return errors.E(op, err) } - j.delayCache[tmpQ] = struct{}{} - return nil + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) } - - // insert to the local, limited pipeline - err := pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - return nil } func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { @@ -491,8 +461,6 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { func (j *JobConsumer) Stop(context.Context) error { j.stopCh <- struct{}{} - close(j.requeueCh) - pipe := j.pipeline.Load().(*pipeline.Pipeline) j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 908dbd15..f252acd8 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -1,6 +1,8 @@ package amqp import ( + "context" + "fmt" "time" json "github.com/json-iterator/go" @@ -52,7 +54,7 @@ type Options struct { // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error - requeueCh chan *Item + requeueFn func(context.Context, *Item) error multipleAsk bool requeue bool @@ -118,12 +120,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay i.Headers = headers - select { - case i.Options.requeueCh <- i: - return nil - default: - return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + errAck := i.Options.nack(false, true) + if errAck != nil { + return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck) + } + + return err } + + // ack the job + err = i.Options.ack(false) + if err != nil { + return err + } + + return nil +} + +func (i *Item) Recycle() { + i.Options = nil } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ @@ -144,8 +162,9 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack - // requeue channel - item.Options.requeueCh = j.requeueCh + + // requeue func + item.Options.requeueFn = j.handleItem return i, nil } @@ -186,7 +205,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ multipleAsk: j.multipleAck, requeue: j.requeueOnFail, - requeueCh: j.requeueCh, + requeueFn: j.handleItem, }} if _, ok := d.Headers[job.RRID].(string); !ok { diff --git a/plugins/jobs/drivers/amqp/requeue.go b/plugins/jobs/drivers/amqp/requeue.go deleted file mode 100644 index a2b3b26c..00000000 --- a/plugins/jobs/drivers/amqp/requeue.go +++ /dev/null @@ -1,34 +0,0 @@ -package amqp - -// requeueListener should handle items passed to requeue -func (j *JobConsumer) requeueListener() { - go func() { - for { //nolint:gosimple - select { - case item, ok := <-j.requeueCh: - if !ok { - j.log.Info("requeue channel closed") - return - } - - pch := <-j.publishChan - - headers, err := pack(item.ID(), item) - if err != nil { - j.publishChan <- pch - j.log.Error("requeue pack", "error", err) - continue - } - - err = j.handleItem(item, headers, pch) - if err != nil { - j.publishChan <- pch - j.log.Error("requeue handle item", "error", err) - continue - } - - j.publishChan <- pch - } - } - }() -} diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 21b05b16..f41a2c8a 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -48,6 +48,15 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config var pipeCfg Config var globalCfg GlobalCfg + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + err := cfg.UnmarshalKey(configKey, &pipeCfg) if err != nil { return nil, errors.E(op, err) @@ -94,8 +103,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config reconnectCh: make(chan struct{}, 2), } - jc.requeueListener() - return jc, nil } @@ -105,6 +112,11 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu // PARSE CONFIGURATION ------- var globalCfg GlobalCfg + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + err := cfg.UnmarshalKey(pluginName, &globalCfg) if err != nil { return nil, errors.E(op, err) @@ -144,8 +156,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu reconnectCh: make(chan struct{}, 2), } - jc.requeueListener() - return jc, nil } func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index a5aa1791..47336b43 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -2,12 +2,12 @@ package beanstalk import ( "bytes" + "context" "encoding/gob" "time" "github.com/beanstalkd/go-beanstalk" json "github.com/json-iterator/go" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" ) @@ -53,7 +53,7 @@ type Options struct { // Private ================ id uint64 conn *beanstalk.Conn - requeueCh chan *Item + requeueFn func(context.Context, *Item) error } // DelayDuration returns delay duration in a form of time.Duration. @@ -115,12 +115,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay i.Headers = headers - select { - case i.Options.requeueCh <- i: - return nil - default: - return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err } + + // delete old job + err = i.Options.conn.Delete(i.Options.id) + if err != nil { + return err + } + + return nil +} + +func (i *Item) Recycle() { + i.Options = nil } func fromJob(job *job.Job) *Item { @@ -154,7 +165,7 @@ func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error { } out.Options.conn = j.pool.conn out.Options.id = id - out.Options.requeueCh = j.requeueCh + out.Options.requeueFn = j.handleItem return nil } diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go deleted file mode 100644 index 21053940..00000000 --- a/plugins/jobs/drivers/beanstalk/requeue.go +++ /dev/null @@ -1,24 +0,0 @@ -package beanstalk - -import "context" - -// requeueListener should handle items passed to requeue -func (j *JobConsumer) requeueListener() { - go func() { - for { //nolint:gosimple - select { - case item, ok := <-j.requeueCh: - if !ok { - j.log.Info("requeue channel closed") - return - } - - err := j.handleItem(context.TODO(), item) - if err != nil { - j.log.Error("requeue handle item", "error", err) - continue - } - } - } - }() -} diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index c1171ae2..9fab8d24 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -118,6 +118,10 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return nil } +func (i *Item) Recycle() { + i.Options = nil +} + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 8d93b12c..5d741358 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -50,8 +50,7 @@ type JobConsumer struct { client *sqs.Client queueURL *string - requeueCh chan *Item - pauseCh chan struct{} + pauseCh chan struct{} } func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -103,7 +102,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -138,8 +136,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) - jb.requeueListener() - return jb, nil } @@ -205,7 +201,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -240,8 +235,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) - jb.requeueListener() - return jb, nil } diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index a761d6bd..f5fac0b3 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -64,7 +64,7 @@ type Options struct { queue *string receiptHandler *string client *sqs.Client - requeueCh chan *Item + requeueFn func(context.Context, *Item) error } // DelayDuration returns delay duration in a form of time.Duration. @@ -144,12 +144,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay i.Headers = headers - select { - case i.Options.requeueCh <- i: - return nil - default: - return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + + // requeue message + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err } + + // Delete job from the queue only after successful requeue + _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: i.Options.queue, + ReceiptHandle: i.Options.receiptHandler, + }) + + if err != nil { + return err + } + + return nil +} + +func (i *Item) Recycle() { + i.Options = nil } func fromJob(job *job.Job) *Item { @@ -227,7 +243,7 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { client: j.client, queue: j.queue, receiptHandler: msg.ReceiptHandle, - requeueCh: j.requeueCh, + requeueFn: j.handleItem, }, } diff --git a/plugins/jobs/drivers/sqs/requeue.go b/plugins/jobs/drivers/sqs/requeue.go deleted file mode 100644 index 87e885e0..00000000 --- a/plugins/jobs/drivers/sqs/requeue.go +++ /dev/null @@ -1,25 +0,0 @@ -package sqs - -import "context" - -// requeueListener should handle items passed to requeue -func (j *JobConsumer) requeueListener() { - go func() { - for { //nolint:gosimple - select { - case item, ok := <-j.requeueCh: - if !ok { - j.log.Info("requeue channel closed") - return - } - - // TODO(rustatian): what context to use - err := j.handleItem(context.TODO(), item) - if err != nil { - j.log.Error("requeue handle item", "error", err) - continue - } - } - } - }() -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 8ea18cfd..87559034 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -104,19 +104,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return nil } -func (p *Plugin) getPayload(body, context []byte) *payload.Payload { - pld := p.pldPool.Get().(*payload.Payload) - pld.Body = body - pld.Context = context - return pld -} - -func (p *Plugin) putPayload(pld *payload.Payload) { - pld.Body = nil - pld.Context = nil - p.pldPool.Put(pld) -} - func (p *Plugin) Serve() chan error { //nolint:gocognit errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") @@ -261,6 +248,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } + // free the resources + jb.Recycle() // return payload p.putPayload(exec) } @@ -565,3 +554,16 @@ func (p *Plugin) collectJobsEvents(event interface{}) { } } } + +func (p *Plugin) getPayload(body, context []byte) *payload.Payload { + pld := p.pldPool.Get().(*payload.Payload) + pld.Body = body + pld.Context = context + return pld +} + +func (p *Plugin) putPayload(pld *payload.Payload) { + pld.Body = nil + pld.Context = nil + p.pldPool.Put(pld) +} diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go index 691369d0..9d769fdf 100644 --- a/plugins/jobs/protocol.go +++ b/plugins/jobs/protocol.go @@ -10,8 +10,8 @@ import ( type Type uint32 const ( - Error Type = iota - NoError + NoError Type = iota + Error ) // internal worker protocol (jobs mode) @@ -19,7 +19,7 @@ type protocol struct { // message type, see Type T Type `json:"type"` // Payload - Data []byte `json:"data"` + Data json.RawMessage `json:"data"` } type errorResp struct { @@ -55,7 +55,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { return errors.E(op, err) } - log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) + log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) if er.Requeue { err = jb.Requeue(er.Headers, er.Delay) @@ -64,6 +64,9 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { } return nil } + + return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg)) + default: err = jb.Ack() if err != nil { diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md index 77c78cb8..c89877e3 100644 --- a/plugins/jobs/response_protocol.md +++ b/plugins/jobs/response_protocol.md @@ -15,7 +15,8 @@ Types are: - `NO_ERROR`: contains only `type` and empty `data`. - `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job, - `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap with string key and array of strings as a value. + `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap + with string key and array of strings as a value. For example: @@ -41,12 +42,10 @@ For example: "headers": [ { "test": [ - { - "ttt": "11", - "ggg": "22" - } - ], - "test2": "2" + "1", + "2", + "3" + ] } ], "delay_seconds": 10 |