From e9713a1d08a93e2be70c889c600ed89f54822b54 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 22 Jul 2021 18:05:31 +0300 Subject: Fix AMQP bugs, add more amqp tests Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/jobs_amqp_test.go | 253 +++++++++++++++++++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100644 tests/plugins/jobs/jobs_amqp_test.go (limited to 'tests/plugins/jobs/jobs_amqp_test.go') diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go new file mode 100644 index 00000000..7e74891c --- /dev/null +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -0,0 +1,253 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestAMQPInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-amqp-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestAMQPDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-amqp-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareAMQPPipeline", declareAMQPPipe) + t.Run("ConsumeAMQPPipeline", consumeAMQPPipe) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + t.Run("PauseAMQPPipeline", pausePipelines("test-3")) + t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareAMQPPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "amqp", + "name": "test-3", + "routing-key": "test-3", + "queue": "default", + "exchange-type": "direct", + "exchange": "amqp.default", + "prefetch": "100", + "priority": "3", + "exclusive": "true", + "multiple_ask": "true", + "requeue_on_fail": "true", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} + +func consumeAMQPPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} + pipe.GetPipelines()[0] = "test-3" + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Resume", pipe, er) + assert.NoError(t, err) +} -- cgit v1.2.3 From 2ceebd687fd17b6029ef3df0e979c39bb39abc7f Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 22 Jul 2021 18:10:33 +0300 Subject: Linters Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/jobs_amqp_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'tests/plugins/jobs/jobs_amqp_test.go') diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 7e74891c..52bb6c7c 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -149,7 +149,7 @@ func TestAMQPDeclare(t *testing.T) { &server.Plugin{}, &rpcPlugin.Plugin{}, &logger.ZapLogger{}, - //mockLogger, + // mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, -- cgit v1.2.3 From 584e9ad1f50223f873661babae3b365a2b0662ec Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 22 Jul 2021 19:41:11 +0300 Subject: Initial tests for all drivers Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/jobs_amqp_test.go | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) (limited to 'tests/plugins/jobs/jobs_amqp_test.go') diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 52bb6c7c..a95ac98b 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -205,7 +205,7 @@ func TestAMQPDeclare(t *testing.T) { time.Sleep(time.Second * 3) t.Run("DeclareAMQPPipeline", declareAMQPPipe) - t.Run("ConsumeAMQPPipeline", consumeAMQPPipe) + t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) t.Run("PushAMQPPipeline", pushToPipe("test-3")) t.Run("PauseAMQPPipeline", pausePipelines("test-3")) t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) @@ -238,16 +238,3 @@ func declareAMQPPipe(t *testing.T) { err = client.Call("jobs.Declare", pipe, er) assert.NoError(t, err) } - -func consumeAMQPPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} - pipe.GetPipelines()[0] = "test-3" - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Resume", pipe, er) - assert.NoError(t, err) -} -- cgit v1.2.3 From f585c654027f6dc40d7e3a3cdf7906006ed2427a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 23 Jul 2021 09:50:41 +0300 Subject: Use zap logger in AMQP tests instead of mocked (temporarly) Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/jobs_amqp_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'tests/plugins/jobs/jobs_amqp_test.go') diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index a95ac98b..8f87072b 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -58,7 +58,8 @@ func TestAMQPInit(t *testing.T) { cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - mockLogger, + &logger.ZapLogger{}, + // mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, -- cgit v1.2.3 From 9079cc51599d6e21ac59e34573f7bbf2e2e87b9e Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 23 Jul 2021 13:25:09 +0300 Subject: Tests refactoring Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/jobs_amqp_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) (limited to 'tests/plugins/jobs/jobs_amqp_test.go') diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 8f87072b..cbbf43d8 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -24,6 +24,7 @@ import ( jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAMQPInit(t *testing.T) { @@ -239,3 +240,33 @@ func declareAMQPPipe(t *testing.T) { err = client.Call("jobs.Declare", pipe, er) assert.NoError(t, err) } + +func TestAMQPNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} -- cgit v1.2.3 From 2d460062c97f9ad1e793831c54ae4d177dea83e8 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 11 Aug 2021 22:03:34 +0300 Subject: Durable requeue algo. Update AMQP and Beanstalk tests to use mock logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/jobs_amqp_test.go | 115 ++++++++++++++++++++++++++++++++--- 1 file changed, 106 insertions(+), 9 deletions(-) (limited to 'tests/plugins/jobs/jobs_amqp_test.go') diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index cbbf43d8..bb5281c0 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -48,6 +48,9 @@ func TestAMQPInit(t *testing.T) { mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -59,8 +62,7 @@ func TestAMQPInit(t *testing.T) { cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -136,22 +138,116 @@ func TestAMQPDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareAMQPPipeline", declareAMQPPipe) + t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseAMQPPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestAMQPJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-amqp-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -209,6 +305,7 @@ func TestAMQPDeclare(t *testing.T) { t.Run("DeclareAMQPPipeline", declareAMQPPipe) t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) t.Run("PauseAMQPPipeline", pausePipelines("test-3")) t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) -- cgit v1.2.3