diff options
author | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
commit | 2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch) | |
tree | d796a11941fab4be668843a3fcbd83ea0859db39 | |
parent | e855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff) |
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing.
Signed-off-by: Valery Piashchynski <[email protected]>
26 files changed, 585 insertions, 239 deletions
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go index fb5b83de..05ddf5ef 100644 --- a/pkg/priority_queue/binary_heap_test.go +++ b/pkg/priority_queue/binary_heap_test.go @@ -40,6 +40,8 @@ func (t Test) Priority() int64 { return int64(t) } +func (t Test) Recycle() {} + func TestBinHeap_Init(t *testing.T) { a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 9efa4652..0034cbd3 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -28,4 +28,7 @@ type Item interface { // Requeue - put the message back to the queue with the optional delay Requeue(headers map[string][]string, delay int64) error + + // Recycle frees resources allocated by the Item + Recycle() } diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index d7425858..429953e1 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -54,7 +54,6 @@ type JobConsumer struct { listeners uint32 stopCh chan struct{} - requeueCh chan *Item } // NewAMQPConsumer initializes rabbitmq pipeline @@ -112,7 +111,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, exclusive: pipeCfg.Exclusive, multipleAck: pipeCfg.MultipleAck, requeueOnFail: pipeCfg.RequeueOnFail, - requeueCh: make(chan *Item, 1000), } jb.conn, err = amqp.Dial(globalCfg.Addr) @@ -137,7 +135,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, // run redialer and requeue listener for the connection jb.redialer() - jb.requeueListener() return jb, nil } @@ -184,7 +181,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con exclusive: pipeline.Bool(exclusive, false), multipleAck: pipeline.Bool(multipleAsk, false), requeueOnFail: pipeline.Bool(requeueOnFail, false), - requeueCh: make(chan *Item, 1000), } jb.conn, err = amqp.Dial(globalCfg.Addr) @@ -213,7 +209,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // run redialer for the connection jb.redialer() - jb.requeueListener() return jb, nil } @@ -228,9 +223,17 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) } - // lock needed here to protect redial concurrent operation - // we may be in the redial state here + err := j.handleItem(ctx, fromJob(job)) + if err != nil { + return errors.E(op, err) + } + + return nil +} +// handleItem +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("rabbitmq_handle_item") select { case pch := <-j.publishChan: // return the channel back @@ -239,40 +242,35 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { }() // convert - msg := fromJob(job) - p, err := pack(job.Ident, msg) - if err != nil { - return errors.E(op, err) - } - - err = j.handleItem(msg, p, pch) + table, err := pack(msg.ID(), msg) if err != nil { return errors.E(op, err) } - return nil + const op = errors.Op("amqp_handle_item") + // handle timeouts + if msg.Options.DelayDuration() > 0 { + // TODO declare separate method for this if condition + // TODO dlx cache channel?? + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + if err != nil { + return errors.E(op, err) + } - case <-ctx.Done(): - return errors.E(op, errors.TimeOut, ctx.Err()) - } -} + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + return errors.E(op, err) + } -// handleItem -func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) error { - const op = errors.Op("amqp_handle_item") - // handle timeouts - if msg.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - - // delay cache optimization. - // If user already declared a queue with a delay, do not redeclare and rebind the queue - // Before -> 2.5k RPS with redeclaration - // After -> 30k RPS - if _, exists := j.delayCache[tmpQ]; exists { // insert to the local, limited pipeline - err := pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, Timestamp: time.Now().UTC(), @@ -284,29 +282,16 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) return errors.E(op, err) } - return nil - } - - _, err := pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - if err != nil { - return errors.E(op, err) - } + j.delayCache[tmpQ] = struct{}{} - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - return errors.E(op, err) + return nil } // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, - Timestamp: time.Now().UTC(), + Timestamp: time.Now(), DeliveryMode: amqp.Persistent, Body: msg.Body(), }) @@ -315,25 +300,10 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) return errors.E(op, err) } - j.delayCache[tmpQ] = struct{}{} - return nil + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) } - - // insert to the local, limited pipeline - err := pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - return nil } func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { @@ -491,8 +461,6 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { func (j *JobConsumer) Stop(context.Context) error { j.stopCh <- struct{}{} - close(j.requeueCh) - pipe := j.pipeline.Load().(*pipeline.Pipeline) j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 908dbd15..f252acd8 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -1,6 +1,8 @@ package amqp import ( + "context" + "fmt" "time" json "github.com/json-iterator/go" @@ -52,7 +54,7 @@ type Options struct { // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error - requeueCh chan *Item + requeueFn func(context.Context, *Item) error multipleAsk bool requeue bool @@ -118,12 +120,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay i.Headers = headers - select { - case i.Options.requeueCh <- i: - return nil - default: - return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + errAck := i.Options.nack(false, true) + if errAck != nil { + return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck) + } + + return err } + + // ack the job + err = i.Options.ack(false) + if err != nil { + return err + } + + return nil +} + +func (i *Item) Recycle() { + i.Options = nil } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ @@ -144,8 +162,9 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack - // requeue channel - item.Options.requeueCh = j.requeueCh + + // requeue func + item.Options.requeueFn = j.handleItem return i, nil } @@ -186,7 +205,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ multipleAsk: j.multipleAck, requeue: j.requeueOnFail, - requeueCh: j.requeueCh, + requeueFn: j.handleItem, }} if _, ok := d.Headers[job.RRID].(string); !ok { diff --git a/plugins/jobs/drivers/amqp/requeue.go b/plugins/jobs/drivers/amqp/requeue.go deleted file mode 100644 index a2b3b26c..00000000 --- a/plugins/jobs/drivers/amqp/requeue.go +++ /dev/null @@ -1,34 +0,0 @@ -package amqp - -// requeueListener should handle items passed to requeue -func (j *JobConsumer) requeueListener() { - go func() { - for { //nolint:gosimple - select { - case item, ok := <-j.requeueCh: - if !ok { - j.log.Info("requeue channel closed") - return - } - - pch := <-j.publishChan - - headers, err := pack(item.ID(), item) - if err != nil { - j.publishChan <- pch - j.log.Error("requeue pack", "error", err) - continue - } - - err = j.handleItem(item, headers, pch) - if err != nil { - j.publishChan <- pch - j.log.Error("requeue handle item", "error", err) - continue - } - - j.publishChan <- pch - } - } - }() -} diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 21b05b16..f41a2c8a 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -48,6 +48,15 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config var pipeCfg Config var globalCfg GlobalCfg + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + err := cfg.UnmarshalKey(configKey, &pipeCfg) if err != nil { return nil, errors.E(op, err) @@ -94,8 +103,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config reconnectCh: make(chan struct{}, 2), } - jc.requeueListener() - return jc, nil } @@ -105,6 +112,11 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu // PARSE CONFIGURATION ------- var globalCfg GlobalCfg + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + err := cfg.UnmarshalKey(pluginName, &globalCfg) if err != nil { return nil, errors.E(op, err) @@ -144,8 +156,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu reconnectCh: make(chan struct{}, 2), } - jc.requeueListener() - return jc, nil } func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index a5aa1791..47336b43 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -2,12 +2,12 @@ package beanstalk import ( "bytes" + "context" "encoding/gob" "time" "github.com/beanstalkd/go-beanstalk" json "github.com/json-iterator/go" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" ) @@ -53,7 +53,7 @@ type Options struct { // Private ================ id uint64 conn *beanstalk.Conn - requeueCh chan *Item + requeueFn func(context.Context, *Item) error } // DelayDuration returns delay duration in a form of time.Duration. @@ -115,12 +115,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay i.Headers = headers - select { - case i.Options.requeueCh <- i: - return nil - default: - return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err } + + // delete old job + err = i.Options.conn.Delete(i.Options.id) + if err != nil { + return err + } + + return nil +} + +func (i *Item) Recycle() { + i.Options = nil } func fromJob(job *job.Job) *Item { @@ -154,7 +165,7 @@ func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error { } out.Options.conn = j.pool.conn out.Options.id = id - out.Options.requeueCh = j.requeueCh + out.Options.requeueFn = j.handleItem return nil } diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go deleted file mode 100644 index 21053940..00000000 --- a/plugins/jobs/drivers/beanstalk/requeue.go +++ /dev/null @@ -1,24 +0,0 @@ -package beanstalk - -import "context" - -// requeueListener should handle items passed to requeue -func (j *JobConsumer) requeueListener() { - go func() { - for { //nolint:gosimple - select { - case item, ok := <-j.requeueCh: - if !ok { - j.log.Info("requeue channel closed") - return - } - - err := j.handleItem(context.TODO(), item) - if err != nil { - j.log.Error("requeue handle item", "error", err) - continue - } - } - } - }() -} diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index c1171ae2..9fab8d24 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -118,6 +118,10 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return nil } +func (i *Item) Recycle() { + i.Options = nil +} + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 8d93b12c..5d741358 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -50,8 +50,7 @@ type JobConsumer struct { client *sqs.Client queueURL *string - requeueCh chan *Item - pauseCh chan struct{} + pauseCh chan struct{} } func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -103,7 +102,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -138,8 +136,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) - jb.requeueListener() - return jb, nil } @@ -205,7 +201,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -240,8 +235,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) - jb.requeueListener() - return jb, nil } diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index a761d6bd..f5fac0b3 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -64,7 +64,7 @@ type Options struct { queue *string receiptHandler *string client *sqs.Client - requeueCh chan *Item + requeueFn func(context.Context, *Item) error } // DelayDuration returns delay duration in a form of time.Duration. @@ -144,12 +144,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay i.Headers = headers - select { - case i.Options.requeueCh <- i: - return nil - default: - return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + + // requeue message + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err } + + // Delete job from the queue only after successful requeue + _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: i.Options.queue, + ReceiptHandle: i.Options.receiptHandler, + }) + + if err != nil { + return err + } + + return nil +} + +func (i *Item) Recycle() { + i.Options = nil } func fromJob(job *job.Job) *Item { @@ -227,7 +243,7 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { client: j.client, queue: j.queue, receiptHandler: msg.ReceiptHandle, - requeueCh: j.requeueCh, + requeueFn: j.handleItem, }, } diff --git a/plugins/jobs/drivers/sqs/requeue.go b/plugins/jobs/drivers/sqs/requeue.go deleted file mode 100644 index 87e885e0..00000000 --- a/plugins/jobs/drivers/sqs/requeue.go +++ /dev/null @@ -1,25 +0,0 @@ -package sqs - -import "context" - -// requeueListener should handle items passed to requeue -func (j *JobConsumer) requeueListener() { - go func() { - for { //nolint:gosimple - select { - case item, ok := <-j.requeueCh: - if !ok { - j.log.Info("requeue channel closed") - return - } - - // TODO(rustatian): what context to use - err := j.handleItem(context.TODO(), item) - if err != nil { - j.log.Error("requeue handle item", "error", err) - continue - } - } - } - }() -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 8ea18cfd..87559034 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -104,19 +104,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return nil } -func (p *Plugin) getPayload(body, context []byte) *payload.Payload { - pld := p.pldPool.Get().(*payload.Payload) - pld.Body = body - pld.Context = context - return pld -} - -func (p *Plugin) putPayload(pld *payload.Payload) { - pld.Body = nil - pld.Context = nil - p.pldPool.Put(pld) -} - func (p *Plugin) Serve() chan error { //nolint:gocognit errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") @@ -261,6 +248,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } + // free the resources + jb.Recycle() // return payload p.putPayload(exec) } @@ -565,3 +554,16 @@ func (p *Plugin) collectJobsEvents(event interface{}) { } } } + +func (p *Plugin) getPayload(body, context []byte) *payload.Payload { + pld := p.pldPool.Get().(*payload.Payload) + pld.Body = body + pld.Context = context + return pld +} + +func (p *Plugin) putPayload(pld *payload.Payload) { + pld.Body = nil + pld.Context = nil + p.pldPool.Put(pld) +} diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go index 691369d0..9d769fdf 100644 --- a/plugins/jobs/protocol.go +++ b/plugins/jobs/protocol.go @@ -10,8 +10,8 @@ import ( type Type uint32 const ( - Error Type = iota - NoError + NoError Type = iota + Error ) // internal worker protocol (jobs mode) @@ -19,7 +19,7 @@ type protocol struct { // message type, see Type T Type `json:"type"` // Payload - Data []byte `json:"data"` + Data json.RawMessage `json:"data"` } type errorResp struct { @@ -55,7 +55,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { return errors.E(op, err) } - log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) + log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) if er.Requeue { err = jb.Requeue(er.Headers, er.Delay) @@ -64,6 +64,9 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { } return nil } + + return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg)) + default: err = jb.Ack() if err != nil { diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md index 77c78cb8..c89877e3 100644 --- a/plugins/jobs/response_protocol.md +++ b/plugins/jobs/response_protocol.md @@ -15,7 +15,8 @@ Types are: - `NO_ERROR`: contains only `type` and empty `data`. - `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job, - `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap with string key and array of strings as a value. + `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap + with string key and array of strings as a value. For example: @@ -41,12 +42,10 @@ For example: "headers": [ { "test": [ - { - "ttt": "11", - "ggg": "22" - } - ], - "test2": "2" + "1", + "2", + "3" + ] } ], "delay_seconds": 10 diff --git a/tests/jobs_err.php b/tests/jobs_err.php new file mode 100644 index 00000000..4ccea4f8 --- /dev/null +++ b/tests/jobs_err.php @@ -0,0 +1,52 @@ +<?php + +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; +use Spiral\Goridge\StreamRelay; + +require __DIR__ . "/vendor/autoload.php"; + +$rr = new RoadRunner\Worker(new StreamRelay(\STDIN, \STDOUT)); + +while ($in = $rr->waitPayload()) { + try { + $ctx = json_decode($in->header, true); + $headers = $ctx['headers']; + + $set = isset($headers['attempts']); + + $val = 0; + + if ($set == true) { + $val = intval($headers['attempts'][0]); + $val++; + $headers['attempts'][0] = strval($val); + } else { + $headers['attempts'][0] = "1"; + }; + + if ($val > 3) { + $rr->respond(new RoadRunner\Payload(json_encode([ + // no error + 'type' => 0, + 'data' => [] + ]))); + } else { + $rr->respond(new RoadRunner\Payload(json_encode([ + 'type' => 1, + 'data' => [ + 'message' => 'error', + 'requeue' => true, + 'delay_seconds' => 5, + 'headers' => $headers + ] + ]))); + } + } catch (\Throwable $e) { + $rr->error((string)$e); + } +} diff --git a/tests/jobs_ok.php b/tests/jobs_ok.php new file mode 100644 index 00000000..fa58dd9a --- /dev/null +++ b/tests/jobs_ok.php @@ -0,0 +1,32 @@ +<?php + +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; +use Spiral\Goridge\StreamRelay; + +require __DIR__ . "/vendor/autoload.php"; + +$rr = new RoadRunner\Worker(new StreamRelay(\STDIN, \STDOUT)); + +while ($in = $rr->waitPayload()) { + try { + $ctx = json_decode($in->header, true); + $headers = $ctx['headers']; + + $rr->respond(new RoadRunner\Payload(json_encode([ + 'type' => 0, + 'data' => [ + 'message' => 'error', + 'requeue' => true, + 'delay_seconds' => 10, + 'headers' => $headers + ] + ]))); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +} diff --git a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml index 32883ce2..f9a7308b 100644 --- a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml +++ b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml @@ -2,7 +2,7 @@ rpc: listen: tcp://127.0.0.1:6001 server: - command: "php ../../client.php echo pipes" + command: "php ../../jobs_ok.php" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml index c06b5a79..43840545 100644 --- a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml +++ b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml @@ -2,7 +2,7 @@ rpc: listen: tcp://127.0.0.1:6001 server: - command: "php ../../client.php echo pipes" + command: "php ../../jobs_ok.php" relay: "pipes" relay_timeout: "20s" @@ -15,7 +15,7 @@ logs: mode: development jobs: - num_pollers: 10 + num_pollers: 1 pipeline_size: 100000 timeout: 1 pool: diff --git a/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml new file mode 100644 index 00000000..79493d96 --- /dev/null +++ b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:[email protected]:5672/ + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml index 022bf2f4..3555ef96 100644 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml @@ -2,7 +2,7 @@ rpc: listen: tcp://127.0.0.1:6001 server: - command: "php ../../client.php echo pipes" + command: "php ../../jobs_ok.php" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml index 8ded8cf1..cf9069a8 100644 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml @@ -2,7 +2,7 @@ rpc: listen: tcp://127.0.0.1:6001 server: - command: "php ../../client.php echo pipes" + command: "php ../../jobs_ok.php" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml new file mode 100644 index 00000000..a4f31290 --- /dev/null +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml @@ -0,0 +1,27 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +beanstalk: + # beanstalk address + addr: tcp://127.0.0.1:11300 + # connect timeout + timeout: 10s + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml new file mode 100644 index 00000000..87f46069 --- /dev/null +++ b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml @@ -0,0 +1,31 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: error + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-1: + driver: beanstalk + priority: 11 + tube_priority: 1 + tube: default-1 + reserve_timeout: 10s + + consume: [ "test-1" ] diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index cbbf43d8..bb5281c0 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -48,6 +48,9 @@ func TestAMQPInit(t *testing.T) { mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -59,8 +62,7 @@ func TestAMQPInit(t *testing.T) { cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -136,22 +138,116 @@ func TestAMQPDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareAMQPPipeline", declareAMQPPipe) + t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseAMQPPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestAMQPJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-amqp-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -209,6 +305,7 @@ func TestAMQPDeclare(t *testing.T) { t.Run("DeclareAMQPPipeline", declareAMQPPipe) t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) t.Run("PauseAMQPPipeline", pausePipelines("test-3")) t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index b36b4977..916ac08f 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -24,6 +24,7 @@ import ( jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestBeanstalkInit(t *testing.T) { @@ -47,19 +48,21 @@ func TestBeanstalkInit(t *testing.T) { mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -135,22 +138,123 @@ func TestBeanstalkDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) - mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe) + t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3")) + t.Run("PushBeanstalkPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseBeanstalkPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} +func TestBeanstalkJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-beanstalk-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() + + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -208,7 +312,9 @@ func TestBeanstalkDeclare(t *testing.T) { t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe) t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3")) t.Run("PushBeanstalkPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) t.Run("PauseBeanstalkPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) @@ -216,6 +322,36 @@ func TestBeanstalkDeclare(t *testing.T) { wg.Wait() } +func TestBeanstalkNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} + func declareBeanstalkPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) |