diff options
author | Valery Piashchynski <[email protected]> | 2021-07-23 20:50:24 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-23 20:50:24 +0300 |
commit | c61756635c0d1b25b304627c8a693f2e9e2ee4b3 (patch) | |
tree | a10857e07a2d58a1d2ff602974de04f3bfbee6f6 | |
parent | e88dfd5cd10662f0ad68e69f9d9de2f66ddf26d0 (diff) |
SQS initial durability test
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 22 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/listener.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/rabbit_init.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/redial.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 5 | ||||
-rw-r--r-- | tests/plugins/jobs/durability/.rr-sqs-durability-redial.yaml | 59 | ||||
-rw-r--r-- | tests/plugins/jobs/helpers.go | 27 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_with_toxics_test.go | 127 | ||||
-rw-r--r-- | tests/plugins/jobs/sqs/.rr-sqs-init.yaml | 2 |
11 files changed, 244 insertions, 32 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 32119273..8c55399c 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -17,7 +17,7 @@ import ( "github.com/streadway/amqp" ) -type JobsConsumer struct { +type JobConsumer struct { sync.Mutex log logger.Logger pq priorityqueue.Queue @@ -50,7 +50,7 @@ type JobsConsumer struct { } // NewAMQPConsumer initializes rabbitmq pipeline -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) { +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -84,7 +84,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, globalCfg.InitDefault() // PARSE CONFIGURATION END ------- - jb := &JobsConsumer{ + jb := &JobConsumer{ log: log, pq: pq, eh: e, @@ -129,7 +129,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) { +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { const op = errors.Op("new_amqp_consumer_from_pipeline") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -152,7 +152,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // PARSE CONFIGURATION ------- - jb := &JobsConsumer{ + jb := &JobConsumer{ log: log, eh: e, pq: pq, @@ -200,7 +200,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return jb, nil } -func (j *JobsConsumer) Push(job *job.Job) error { +func (j *JobConsumer) Push(job *job.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered @@ -298,12 +298,12 @@ func (j *JobsConsumer) Push(job *job.Job) error { return nil } -func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { +func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { j.pipeline.Store(pipeline) return nil } -func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { +func (j *JobConsumer) Run(p *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -353,7 +353,7 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { return nil } -func (j *JobsConsumer) Pause(p string) { +func (j *JobConsumer) Pause(p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested pause on: ", p) @@ -391,7 +391,7 @@ func (j *JobsConsumer) Pause(p string) { }) } -func (j *JobsConsumer) Resume(p string) { +func (j *JobConsumer) Resume(p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested resume on: ", p) @@ -450,7 +450,7 @@ func (j *JobsConsumer) Resume(p string) { }) } -func (j *JobsConsumer) Stop() error { +func (j *JobConsumer) Stop() error { j.stopCh <- struct{}{} pipe := j.pipeline.Load().(*pipeline.Pipeline) diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 1ebe2751..295ccfd3 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -111,7 +111,7 @@ func (j *Item) Nack() error { return j.NackFunc(false, j.Options.requeue) } -func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { +func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") item, err := j.unpack(d) if err != nil { @@ -161,7 +161,7 @@ func pack(id string, j *Item) (amqp.Table, error) { } // unpack restores jobs.Options -func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) { +func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ multipleAsk: j.multipleAck, requeue: j.requeueOnFail, diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go index 7241c717..8011aa3b 100644 --- a/plugins/jobs/drivers/amqp/listener.go +++ b/plugins/jobs/drivers/amqp/listener.go @@ -2,7 +2,7 @@ package amqp import "github.com/streadway/amqp" -func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { +func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) { go func() { for { //nolint:gosimple select { diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/rabbit_init.go index d6b8a708..570498e9 100644 --- a/plugins/jobs/drivers/amqp/rabbit_init.go +++ b/plugins/jobs/drivers/amqp/rabbit_init.go @@ -7,7 +7,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/events" ) -func (j *JobsConsumer) initRabbitMQ() error { +func (j *JobConsumer) initRabbitMQ() error { const op = errors.Op("jobs_plugin_rmq_init") // Channel opens a unique, concurrent server channel to process the bulk of AMQP // messages. Any error from methods on this receiver will render the receiver diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go index 47afdcb3..d61c75b2 100644 --- a/plugins/jobs/drivers/amqp/redial.go +++ b/plugins/jobs/drivers/amqp/redial.go @@ -11,7 +11,7 @@ import ( ) // redialer used to redial to the rabbitmq in case of the connection interrupts -func (j *JobsConsumer) redialer() { //nolint:gocognit +func (j *JobConsumer) redialer() { //nolint:gocognit go func() { const op = errors.Op("rabbitmq_redial") diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index ff8f7860..9de64b82 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -22,7 +22,7 @@ type Config struct { Prefetch uint64 `mapstructure:"prefetch"` } -type JobBroker struct { +type JobConsumer struct { cfg *Config log logger.Logger eh events.Handler @@ -35,10 +35,10 @@ type JobBroker struct { stopCh chan struct{} } -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { const op = errors.Op("new_ephemeral_pipeline") - jb := &JobBroker{ + jb := &JobConsumer{ log: log, pq: pq, eh: eh, @@ -64,8 +64,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { - jb := &JobBroker{ +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { + jb := &JobConsumer{ log: log, pq: pq, eh: eh, @@ -82,7 +82,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand return jb, nil } -func (j *JobBroker) Push(jb *job.Job) error { +func (j *JobConsumer) Push(jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered @@ -127,7 +127,7 @@ func (j *JobBroker) Push(jb *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) } -func (j *JobBroker) consume() { +func (j *JobConsumer) consume() { // redirect for { select { @@ -139,7 +139,7 @@ func (j *JobBroker) consume() { } } -func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { +func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") if _, ok := j.pipeline.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) @@ -150,7 +150,7 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { return nil } -func (j *JobBroker) Pause(pipeline string) { +func (j *JobConsumer) Pause(pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == true { // mark pipeline as turned off @@ -166,7 +166,7 @@ func (j *JobBroker) Pause(pipeline string) { }) } -func (j *JobBroker) Resume(pipeline string) { +func (j *JobConsumer) Resume(pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == false { // mark pipeline as turned on @@ -183,7 +183,7 @@ func (j *JobBroker) Resume(pipeline string) { } // Run is no-op for the ephemeral -func (j *JobBroker) Run(pipe *pipeline.Pipeline) error { +func (j *JobConsumer) Run(pipe *pipeline.Pipeline) error { j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), @@ -193,7 +193,7 @@ func (j *JobBroker) Run(pipe *pipeline.Pipeline) error { return nil } -func (j *JobBroker) Stop() error { +func (j *JobConsumer) Stop() error { var pipe string j.pipeline.Range(func(key, _ interface{}) bool { pipe = key.(string) diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index f6311715..08a6170e 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -256,9 +256,12 @@ func (j *JobConsumer) Push(jb *job.Job) error { msg := fromJob(jb) + // 10 seconds deadline to make a request TODO ??? + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) + defer cancel() // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. - _, err := j.client.SendMessage(context.Background(), msg.pack(j.queueURL)) + _, err := j.client.SendMessage(ctx, msg.pack(j.queueURL)) if err != nil { return errors.E(op, err) } diff --git a/tests/plugins/jobs/durability/.rr-sqs-durability-redial.yaml b/tests/plugins/jobs/durability/.rr-sqs-durability-redial.yaml new file mode 100644 index 00000000..d7d93fe6 --- /dev/null +++ b/tests/plugins/jobs/durability/.rr-sqs-durability-redial.yaml @@ -0,0 +1,59 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://localhost:19324 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: sqs + prefetch: 10 + visibility_timeout: 0 + wait_time_seconds: 1 + queue: default + # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html + attributes: + DelaySeconds: 0 + MaximumMessageSize: 262144 + MessageRetentionPeriod: 345600 + ReceiveMessageWaitTimeSeconds: 0 + VisibilityTimeout: 0 + tags: + test: "tag" + + test-2: + driver: sqs + prefetch: 10 + queue: default-2 + wait_time_seconds: 1 + attributes: + MessageRetentionPeriod: 86400 + tags: + test: "tag" + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index 27b2d1e0..d8c32a49 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -81,6 +81,33 @@ func pushToPipe(pipeline string) func(t *testing.T) { } } +func pushToPipeExpectErr(pipeline string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + Delay: 0, + Attempts: 0, + RetryDelay: 0, + Timeout: 0, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Push", req, er) + require.Error(t, err) + } +} + func pausePipelines(pipes ...string) func(t *testing.T) { return func(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go index 93452cc4..071c04be 100644 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -15,6 +15,8 @@ import ( "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" + "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -70,7 +72,8 @@ func TestDurabilityAMQP(t *testing.T) { cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - mockLogger, + // mockLogger, + &logger.ZapLogger{}, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -126,9 +129,131 @@ func TestDurabilityAMQP(t *testing.T) { time.Sleep(time.Second * 3) disableProxy("redial", t) time.Sleep(time.Second * 5) + + t.Run("DestroyPipelineWhileRedialing", destroyPipelines("test-1", "test-2")) + enableProxy("redial", t) time.Sleep(time.Second * 3) stopCh <- struct{}{} wg.Wait() } + +func TestDurabilitySQS(t *testing.T) { + client := toxiproxy.NewClient("localhost:8474") + + _, err := client.CreateProxy("redial", "localhost:19324", "localhost:9324") + require.NoError(t, err) + defer deleteProxy("redial", t) + + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + require.NoError(t, err) + + cfg := &config.Viper{ + Path: "durability/.rr-sqs-durability-redial.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(4) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(4) + + // redial errors + mockLogger.EXPECT().Warn("rabbitmq reconnecting, caused by", "error", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("rabbitmq dial succeed. trying to redeclare queues and subscribers").AnyTimes() + mockLogger.EXPECT().Info("queues and subscribers redeclared successfully").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + // mockLogger, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + require.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + disableProxy("redial", t) + time.Sleep(time.Second * 3) + + t.Run("PushPipelineWhileRedialing-1", pushToPipeExpectErr("test-1")) + t.Run("PushPipelineWhileRedialing-2", pushToPipeExpectErr("test-2")) + + enableProxy("redial", t) + + t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) + t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) + + time.Sleep(time.Second * 10) + + stopCh <- struct{}{} + wg.Wait() +} diff --git a/tests/plugins/jobs/sqs/.rr-sqs-init.yaml b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml index ca2f7652..8c62bbdb 100644 --- a/tests/plugins/jobs/sqs/.rr-sqs-init.yaml +++ b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml @@ -6,8 +6,6 @@ server: relay: "pipes" relay_timeout: "20s" -# amazon sqs configuration -# General section sqs: key: api-key secret: api-secret |