summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
committerValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
commit2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch)
treed796a11941fab4be668843a3fcbd83ea0859db39
parente855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff)
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/priority_queue/binary_heap_test.go2
-rw-r--r--pkg/priority_queue/interface.go3
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go108
-rw-r--r--plugins/jobs/drivers/amqp/item.go37
-rw-r--r--plugins/jobs/drivers/amqp/requeue.go34
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go18
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go27
-rw-r--r--plugins/jobs/drivers/beanstalk/requeue.go24
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go4
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go9
-rw-r--r--plugins/jobs/drivers/sqs/item.go30
-rw-r--r--plugins/jobs/drivers/sqs/requeue.go25
-rw-r--r--plugins/jobs/plugin.go28
-rw-r--r--plugins/jobs/protocol.go11
-rw-r--r--plugins/jobs/response_protocol.md13
-rw-r--r--tests/jobs_err.php52
-rw-r--r--tests/jobs_ok.php32
-rw-r--r--tests/plugins/jobs/amqp/.rr-amqp-declare.yaml2
-rw-r--r--tests/plugins/jobs/amqp/.rr-amqp-init.yaml4
-rw-r--r--tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml24
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml2
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml2
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml27
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-no-global.yaml31
-rw-r--r--tests/plugins/jobs/jobs_amqp_test.go115
-rw-r--r--tests/plugins/jobs/jobs_beanstalk_test.go160
26 files changed, 585 insertions, 239 deletions
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
index fb5b83de..05ddf5ef 100644
--- a/pkg/priority_queue/binary_heap_test.go
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -40,6 +40,8 @@ func (t Test) Priority() int64 {
return int64(t)
}
+func (t Test) Recycle() {}
+
func TestBinHeap_Init(t *testing.T) {
a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
index 9efa4652..0034cbd3 100644
--- a/pkg/priority_queue/interface.go
+++ b/pkg/priority_queue/interface.go
@@ -28,4 +28,7 @@ type Item interface {
// Requeue - put the message back to the queue with the optional delay
Requeue(headers map[string][]string, delay int64) error
+
+ // Recycle frees resources allocated by the Item
+ Recycle()
}
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index d7425858..429953e1 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -54,7 +54,6 @@ type JobConsumer struct {
listeners uint32
stopCh chan struct{}
- requeueCh chan *Item
}
// NewAMQPConsumer initializes rabbitmq pipeline
@@ -112,7 +111,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
exclusive: pipeCfg.Exclusive,
multipleAck: pipeCfg.MultipleAck,
requeueOnFail: pipeCfg.RequeueOnFail,
- requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -137,7 +135,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
// run redialer and requeue listener for the connection
jb.redialer()
- jb.requeueListener()
return jb, nil
}
@@ -184,7 +181,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
exclusive: pipeline.Bool(exclusive, false),
multipleAck: pipeline.Bool(multipleAsk, false),
requeueOnFail: pipeline.Bool(requeueOnFail, false),
- requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -213,7 +209,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// run redialer for the connection
jb.redialer()
- jb.requeueListener()
return jb, nil
}
@@ -228,9 +223,17 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
}
- // lock needed here to protect redial concurrent operation
- // we may be in the redial state here
+ err := j.handleItem(ctx, fromJob(job))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+// handleItem
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("rabbitmq_handle_item")
select {
case pch := <-j.publishChan:
// return the channel back
@@ -239,40 +242,35 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
}()
// convert
- msg := fromJob(job)
- p, err := pack(job.Ident, msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- err = j.handleItem(msg, p, pch)
+ table, err := pack(msg.ID(), msg)
if err != nil {
return errors.E(op, err)
}
- return nil
+ const op = errors.Op("amqp_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ // TODO dlx cache channel??
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ return errors.E(op, err)
+ }
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
-// handleItem
-func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) error {
- const op = errors.Op("amqp_handle_item")
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
-
- // delay cache optimization.
- // If user already declared a queue with a delay, do not redeclare and rebind the queue
- // Before -> 2.5k RPS with redeclaration
- // After -> 30k RPS
- if _, exists := j.delayCache[tmpQ]; exists {
// insert to the local, limited pipeline
- err := pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
Timestamp: time.Now().UTC(),
@@ -284,29 +282,16 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel)
return errors.E(op, err)
}
- return nil
- }
-
- _, err := pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
- if err != nil {
- return errors.E(op, err)
- }
+ j.delayCache[tmpQ] = struct{}{}
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
+ return nil
}
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
- Timestamp: time.Now().UTC(),
+ Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
Body: msg.Body(),
})
@@ -315,25 +300,10 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel)
return errors.E(op, err)
}
- j.delayCache[tmpQ] = struct{}{}
-
return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
}
-
- // insert to the local, limited pipeline
- err := pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -491,8 +461,6 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
func (j *JobConsumer) Stop(context.Context) error {
j.stopCh <- struct{}{}
- close(j.requeueCh)
-
pipe := j.pipeline.Load().(*pipeline.Pipeline)
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 908dbd15..f252acd8 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -1,6 +1,8 @@
package amqp
import (
+ "context"
+ "fmt"
"time"
json "github.com/json-iterator/go"
@@ -52,7 +54,7 @@ type Options struct {
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
multipleAsk bool
requeue bool
@@ -118,12 +120,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ errAck := i.Options.nack(false, true)
+ if errAck != nil {
+ return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck)
+ }
+
+ return err
}
+
+ // ack the job
+ err = i.Options.ack(false)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
@@ -144,8 +162,9 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
- // requeue channel
- item.Options.requeueCh = j.requeueCh
+
+ // requeue func
+ item.Options.requeueFn = j.handleItem
return i, nil
}
@@ -186,7 +205,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
multipleAsk: j.multipleAck,
requeue: j.requeueOnFail,
- requeueCh: j.requeueCh,
+ requeueFn: j.handleItem,
}}
if _, ok := d.Headers[job.RRID].(string); !ok {
diff --git a/plugins/jobs/drivers/amqp/requeue.go b/plugins/jobs/drivers/amqp/requeue.go
deleted file mode 100644
index a2b3b26c..00000000
--- a/plugins/jobs/drivers/amqp/requeue.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package amqp
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- pch := <-j.publishChan
-
- headers, err := pack(item.ID(), item)
- if err != nil {
- j.publishChan <- pch
- j.log.Error("requeue pack", "error", err)
- continue
- }
-
- err = j.handleItem(item, headers, pch)
- if err != nil {
- j.publishChan <- pch
- j.log.Error("requeue handle item", "error", err)
- continue
- }
-
- j.publishChan <- pch
- }
- }
- }()
-}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 21b05b16..f41a2c8a 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -48,6 +48,15 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
var pipeCfg Config
var globalCfg GlobalCfg
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
err := cfg.UnmarshalKey(configKey, &pipeCfg)
if err != nil {
return nil, errors.E(op, err)
@@ -94,8 +103,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
reconnectCh: make(chan struct{}, 2),
}
- jc.requeueListener()
-
return jc, nil
}
@@ -105,6 +112,11 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
// PARSE CONFIGURATION -------
var globalCfg GlobalCfg
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
err := cfg.UnmarshalKey(pluginName, &globalCfg)
if err != nil {
return nil, errors.E(op, err)
@@ -144,8 +156,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
reconnectCh: make(chan struct{}, 2),
}
- jc.requeueListener()
-
return jc, nil
}
func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index a5aa1791..47336b43 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -2,12 +2,12 @@ package beanstalk
import (
"bytes"
+ "context"
"encoding/gob"
"time"
"github.com/beanstalkd/go-beanstalk"
json "github.com/json-iterator/go"
- "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -53,7 +53,7 @@ type Options struct {
// Private ================
id uint64
conn *beanstalk.Conn
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -115,12 +115,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
}
+
+ // delete old job
+ err = i.Options.conn.Delete(i.Options.id)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
func fromJob(job *job.Job) *Item {
@@ -154,7 +165,7 @@ func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
}
out.Options.conn = j.pool.conn
out.Options.id = id
- out.Options.requeueCh = j.requeueCh
+ out.Options.requeueFn = j.handleItem
return nil
}
diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go
deleted file mode 100644
index 21053940..00000000
--- a/plugins/jobs/drivers/beanstalk/requeue.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package beanstalk
-
-import "context"
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- err := j.handleItem(context.TODO(), item)
- if err != nil {
- j.log.Error("requeue handle item", "error", err)
- continue
- }
- }
- }
- }()
-}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index c1171ae2..9fab8d24 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -118,6 +118,10 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return nil
}
+func (i *Item) Recycle() {
+ i.Options = nil
+}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 8d93b12c..5d741358 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -50,8 +50,7 @@ type JobConsumer struct {
client *sqs.Client
queueURL *string
- requeueCh chan *Item
- pauseCh chan struct{}
+ pauseCh chan struct{}
}
func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -103,7 +102,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
pauseCh: make(chan struct{}, 1),
- requeueCh: make(chan *Item, 1000),
}
// PARSE CONFIGURATION -------
@@ -138,8 +136,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
time.Sleep(time.Second * 2)
- jb.requeueListener()
-
return jb, nil
}
@@ -205,7 +201,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
pauseCh: make(chan struct{}, 1),
- requeueCh: make(chan *Item, 1000),
}
// PARSE CONFIGURATION -------
@@ -240,8 +235,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
time.Sleep(time.Second * 2)
- jb.requeueListener()
-
return jb, nil
}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index a761d6bd..f5fac0b3 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -64,7 +64,7 @@ type Options struct {
queue *string
receiptHandler *string
client *sqs.Client
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -144,12 +144,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ // requeue message
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
}
+
+ // Delete job from the queue only after successful requeue
+ _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
func fromJob(job *job.Job) *Item {
@@ -227,7 +243,7 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
client: j.client,
queue: j.queue,
receiptHandler: msg.ReceiptHandle,
- requeueCh: j.requeueCh,
+ requeueFn: j.handleItem,
},
}
diff --git a/plugins/jobs/drivers/sqs/requeue.go b/plugins/jobs/drivers/sqs/requeue.go
deleted file mode 100644
index 87e885e0..00000000
--- a/plugins/jobs/drivers/sqs/requeue.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package sqs
-
-import "context"
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- // TODO(rustatian): what context to use
- err := j.handleItem(context.TODO(), item)
- if err != nil {
- j.log.Error("requeue handle item", "error", err)
- continue
- }
- }
- }
- }()
-}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 8ea18cfd..87559034 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -104,19 +104,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return nil
}
-func (p *Plugin) getPayload(body, context []byte) *payload.Payload {
- pld := p.pldPool.Get().(*payload.Payload)
- pld.Body = body
- pld.Context = context
- return pld
-}
-
-func (p *Plugin) putPayload(pld *payload.Payload) {
- pld.Body = nil
- pld.Context = nil
- p.pldPool.Put(pld)
-}
-
func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
@@ -261,6 +248,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
+ // free the resources
+ jb.Recycle()
// return payload
p.putPayload(exec)
}
@@ -565,3 +554,16 @@ func (p *Plugin) collectJobsEvents(event interface{}) {
}
}
}
+
+func (p *Plugin) getPayload(body, context []byte) *payload.Payload {
+ pld := p.pldPool.Get().(*payload.Payload)
+ pld.Body = body
+ pld.Context = context
+ return pld
+}
+
+func (p *Plugin) putPayload(pld *payload.Payload) {
+ pld.Body = nil
+ pld.Context = nil
+ p.pldPool.Put(pld)
+}
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
index 691369d0..9d769fdf 100644
--- a/plugins/jobs/protocol.go
+++ b/plugins/jobs/protocol.go
@@ -10,8 +10,8 @@ import (
type Type uint32
const (
- Error Type = iota
- NoError
+ NoError Type = iota
+ Error
)
// internal worker protocol (jobs mode)
@@ -19,7 +19,7 @@ type protocol struct {
// message type, see Type
T Type `json:"type"`
// Payload
- Data []byte `json:"data"`
+ Data json.RawMessage `json:"data"`
}
type errorResp struct {
@@ -55,7 +55,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
return errors.E(op, err)
}
- log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
+ log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
if er.Requeue {
err = jb.Requeue(er.Headers, er.Delay)
@@ -64,6 +64,9 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
}
return nil
}
+
+ return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg))
+
default:
err = jb.Ack()
if err != nil {
diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md
index 77c78cb8..c89877e3 100644
--- a/plugins/jobs/response_protocol.md
+++ b/plugins/jobs/response_protocol.md
@@ -15,7 +15,8 @@ Types are:
- `NO_ERROR`: contains only `type` and empty `data`.
- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the
job,
- `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap with string key and array of strings as a value.
+ `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
+ with string key and array of strings as a value.
For example:
@@ -41,12 +42,10 @@ For example:
"headers": [
{
"test": [
- {
- "ttt": "11",
- "ggg": "22"
- }
- ],
- "test2": "2"
+ "1",
+ "2",
+ "3"
+ ]
}
],
"delay_seconds": 10
diff --git a/tests/jobs_err.php b/tests/jobs_err.php
new file mode 100644
index 00000000..4ccea4f8
--- /dev/null
+++ b/tests/jobs_err.php
@@ -0,0 +1,52 @@
+<?php
+
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+use Spiral\Goridge\StreamRelay;
+
+require __DIR__ . "/vendor/autoload.php";
+
+$rr = new RoadRunner\Worker(new StreamRelay(\STDIN, \STDOUT));
+
+while ($in = $rr->waitPayload()) {
+ try {
+ $ctx = json_decode($in->header, true);
+ $headers = $ctx['headers'];
+
+ $set = isset($headers['attempts']);
+
+ $val = 0;
+
+ if ($set == true) {
+ $val = intval($headers['attempts'][0]);
+ $val++;
+ $headers['attempts'][0] = strval($val);
+ } else {
+ $headers['attempts'][0] = "1";
+ };
+
+ if ($val > 3) {
+ $rr->respond(new RoadRunner\Payload(json_encode([
+ // no error
+ 'type' => 0,
+ 'data' => []
+ ])));
+ } else {
+ $rr->respond(new RoadRunner\Payload(json_encode([
+ 'type' => 1,
+ 'data' => [
+ 'message' => 'error',
+ 'requeue' => true,
+ 'delay_seconds' => 5,
+ 'headers' => $headers
+ ]
+ ])));
+ }
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+}
diff --git a/tests/jobs_ok.php b/tests/jobs_ok.php
new file mode 100644
index 00000000..fa58dd9a
--- /dev/null
+++ b/tests/jobs_ok.php
@@ -0,0 +1,32 @@
+<?php
+
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+use Spiral\Goridge\StreamRelay;
+
+require __DIR__ . "/vendor/autoload.php";
+
+$rr = new RoadRunner\Worker(new StreamRelay(\STDIN, \STDOUT));
+
+while ($in = $rr->waitPayload()) {
+ try {
+ $ctx = json_decode($in->header, true);
+ $headers = $ctx['headers'];
+
+ $rr->respond(new RoadRunner\Payload(json_encode([
+ 'type' => 0,
+ 'data' => [
+ 'message' => 'error',
+ 'requeue' => true,
+ 'delay_seconds' => 10,
+ 'headers' => $headers
+ ]
+ ])));
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+}
diff --git a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml
index 32883ce2..f9a7308b 100644
--- a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml
+++ b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml
@@ -2,7 +2,7 @@ rpc:
listen: tcp://127.0.0.1:6001
server:
- command: "php ../../client.php echo pipes"
+ command: "php ../../jobs_ok.php"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml
index c06b5a79..43840545 100644
--- a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml
+++ b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml
@@ -2,7 +2,7 @@ rpc:
listen: tcp://127.0.0.1:6001
server:
- command: "php ../../client.php echo pipes"
+ command: "php ../../jobs_ok.php"
relay: "pipes"
relay_timeout: "20s"
@@ -15,7 +15,7 @@ logs:
mode: development
jobs:
- num_pollers: 10
+ num_pollers: 1
pipeline_size: 100000
timeout: 1
pool:
diff --git a/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml
new file mode 100644
index 00000000..79493d96
--- /dev/null
+++ b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml
@@ -0,0 +1,24 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../jobs_err.php"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:[email protected]:5672/
+
+logs:
+ level: debug
+ encoding: console
+ mode: development
+
+jobs:
+ num_pollers: 1
+ pipeline_size: 100000
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml
index 022bf2f4..3555ef96 100644
--- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml
+++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml
@@ -2,7 +2,7 @@ rpc:
listen: tcp://127.0.0.1:6001
server:
- command: "php ../../client.php echo pipes"
+ command: "php ../../jobs_ok.php"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml
index 8ded8cf1..cf9069a8 100644
--- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml
+++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml
@@ -2,7 +2,7 @@ rpc:
listen: tcp://127.0.0.1:6001
server:
- command: "php ../../client.php echo pipes"
+ command: "php ../../jobs_ok.php"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml
new file mode 100644
index 00000000..a4f31290
--- /dev/null
+++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml
@@ -0,0 +1,27 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../jobs_err.php"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+beanstalk:
+ # beanstalk address
+ addr: tcp://127.0.0.1:11300
+ # connect timeout
+ timeout: 10s
+
+logs:
+ level: debug
+ encoding: console
+ mode: development
+
+jobs:
+ num_pollers: 10
+ pipeline_size: 100000
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml
new file mode 100644
index 00000000..87f46069
--- /dev/null
+++ b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml
@@ -0,0 +1,31 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../jobs_ok.php"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+logs:
+ level: error
+ mode: development
+
+jobs:
+ num_pollers: 10
+ pipeline_size: 100000
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-1:
+ driver: beanstalk
+ priority: 11
+ tube_priority: 1
+ tube: default-1
+ reserve_timeout: 10s
+
+ consume: [ "test-1" ]
diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go
index cbbf43d8..bb5281c0 100644
--- a/tests/plugins/jobs/jobs_amqp_test.go
+++ b/tests/plugins/jobs/jobs_amqp_test.go
@@ -48,6 +48,9 @@ func TestAMQPInit(t *testing.T) {
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -59,8 +62,7 @@ func TestAMQPInit(t *testing.T) {
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- // mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -136,22 +138,116 @@ func TestAMQPDeclare(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
- mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &amqp.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareAMQPPipeline", declareAMQPPipe)
+ t.Run("ConsumeAMQPPipeline", resumePipes("test-3"))
+ t.Run("PushAMQPPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+ t.Run("PauseAMQPPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second)
+ t.Run("DestroyAMQPPipeline", destroyPipelines("test-3"))
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestAMQPJobsError(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "amqp/.rr-amqp-jobs-err.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- // mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -209,6 +305,7 @@ func TestAMQPDeclare(t *testing.T) {
t.Run("DeclareAMQPPipeline", declareAMQPPipe)
t.Run("ConsumeAMQPPipeline", resumePipes("test-3"))
t.Run("PushAMQPPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 25)
t.Run("PauseAMQPPipeline", pausePipelines("test-3"))
t.Run("DestroyAMQPPipeline", destroyPipelines("test-3"))
diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go
index b36b4977..916ac08f 100644
--- a/tests/plugins/jobs/jobs_beanstalk_test.go
+++ b/tests/plugins/jobs/jobs_beanstalk_test.go
@@ -24,6 +24,7 @@ import (
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
"github.com/spiral/roadrunner/v2/tests/mocks"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestBeanstalkInit(t *testing.T) {
@@ -47,19 +48,21 @@ func TestBeanstalkInit(t *testing.T) {
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes()
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-
- mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
+ mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes()
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- // mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -135,22 +138,123 @@ func TestBeanstalkDeclare(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes()
+ mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
- mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes()
- mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &beanstalk.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe)
+ t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3"))
+ t.Run("PushBeanstalkPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+ t.Run("PauseBeanstalkPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second)
+ t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+func TestBeanstalkJobsError(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "beanstalk/.rr-beanstalk-jobs-err.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes()
mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes()
+
+ mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
+
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- // mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -208,7 +312,9 @@ func TestBeanstalkDeclare(t *testing.T) {
t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe)
t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3"))
t.Run("PushBeanstalkPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 25)
t.Run("PauseBeanstalkPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second)
t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3"))
time.Sleep(time.Second * 5)
@@ -216,6 +322,36 @@ func TestBeanstalkDeclare(t *testing.T) {
wg.Wait()
}
+func TestBeanstalkNoGlobalSection(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "beanstalk/.rr-no-global.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &beanstalk.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = cont.Serve()
+ require.Error(t, err)
+}
+
func declareBeanstalkPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)