diff options
author | Valery Piashchynski <[email protected]> | 2021-08-30 21:32:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-30 21:32:50 +0300 |
commit | c7d9385f135853539100430521042f7e7e2ae005 (patch) | |
tree | 588f45f6cfcd716bb3197ebff8cfdbc86a984afc /tests/plugins/jobs | |
parent | f6070d04558ce2e06a114ec2d9a8557d6f88d89b (diff) |
Tests for the boltdb jobs.
Fix issue with Stop in the jobs plugin which didn't destroy the pool.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests/plugins/jobs')
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-no-global.yaml | 3 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-no-global.yaml | 12 | ||||
-rw-r--r-- | tests/plugins/jobs/helpers.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_beanstalk_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_boltdb_test.go | 524 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_general_test.go | 5 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_sqs_test.go | 14 |
7 files changed, 286 insertions, 276 deletions
diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml index 87f46069..92d090d4 100644 --- a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml @@ -29,3 +29,6 @@ jobs: reserve_timeout: 10s consume: [ "test-1" ] + +endure: + log_level: debug diff --git a/tests/plugins/jobs/boltdb/.rr-no-global.yaml b/tests/plugins/jobs/boltdb/.rr-no-global.yaml index 1c09bef9..54aaf3c6 100644 --- a/tests/plugins/jobs/boltdb/.rr-no-global.yaml +++ b/tests/plugins/jobs/boltdb/.rr-no-global.yaml @@ -26,15 +26,15 @@ jobs: pipelines: test-1: driver: boltdb - prefetch: 100 - file: "rr1.db" - priority: 1 + prefetch: 100 + file: "rr1.db" + priority: 1 test-2: driver: boltdb - prefetch: 100 - file: "rr2.db" - priority: 1 + prefetch: 100 + file: "rr2.db" + priority: 1 # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually consume: [ "test-1", "test-2" ] diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index 5067ef9f..6c2d05ca 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -95,7 +95,7 @@ func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) { req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ Job: "some/php/namespace", - Id: "1", + Id: "2", Payload: `{"hello":"world"}`, Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, Options: &jobsv1beta.Options{ diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index 9f4d37ec..78d154b1 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -466,7 +466,7 @@ func TestBeanstalkStats(t *testing.T) { } func TestBeanstalkNoGlobalSection(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) assert.NoError(t, err) cfg := &config.Viper{ diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go index cf3e5a91..15d2bce8 100644 --- a/tests/plugins/jobs/jobs_boltdb_test.go +++ b/tests/plugins/jobs/jobs_boltdb_test.go @@ -12,6 +12,7 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/boltdb" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" @@ -22,6 +23,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -222,266 +224,268 @@ func TestBoltDBDeclare(t *testing.T) { assert.NoError(t, os.Remove(rr1db)) } -// -//func TestAMQPJobsError(t *testing.T) { -// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) -// assert.NoError(t, err) -// -// cfg := &config.Viper{ -// Path: "amqp/.rr-amqp-jobs-err.yaml", -// Prefix: "rr", -// } -// -// controller := gomock.NewController(t) -// mockLogger := mocks.NewMockLogger(controller) -// -// // general -// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) -// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() -// -// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// -// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) -// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) -// -// err = cont.RegisterAll( -// cfg, -// &server.Plugin{}, -// &rpcPlugin.Plugin{}, -// mockLogger, -// &jobs.Plugin{}, -// &resetter.Plugin{}, -// &informer.Plugin{}, -// &amqp.Plugin{}, -// ) -// assert.NoError(t, err) -// -// err = cont.Init() -// if err != nil { -// t.Fatal(err) -// } -// -// ch, err := cont.Serve() -// if err != nil { -// t.Fatal(err) -// } -// -// sig := make(chan os.Signal, 1) -// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) -// -// wg := &sync.WaitGroup{} -// wg.Add(1) -// -// stopCh := make(chan struct{}, 1) -// -// go func() { -// defer wg.Done() -// for { -// select { -// case e := <-ch: -// assert.Fail(t, "error", e.Error.Error()) -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// case <-sig: -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// case <-stopCh: -// // timeout -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// } -// } -// }() -// -// time.Sleep(time.Second * 3) -// -// t.Run("DeclareAMQPPipeline", declareAMQPPipe) -// t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) -// t.Run("PushAMQPPipeline", pushToPipe("test-3")) -// time.Sleep(time.Second * 25) -// t.Run("PauseAMQPPipeline", pausePipelines("test-3")) -// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) -// -// time.Sleep(time.Second * 5) -// stopCh <- struct{}{} -// wg.Wait() -//} -// -//func TestAMQPNoGlobalSection(t *testing.T) { -// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) -// assert.NoError(t, err) -// -// cfg := &config.Viper{ -// Path: "amqp/.rr-no-global.yaml", -// Prefix: "rr", -// } -// -// err = cont.RegisterAll( -// cfg, -// &server.Plugin{}, -// &rpcPlugin.Plugin{}, -// &logger.ZapLogger{}, -// &jobs.Plugin{}, -// &resetter.Plugin{}, -// &informer.Plugin{}, -// &amqp.Plugin{}, -// ) -// assert.NoError(t, err) -// -// err = cont.Init() -// if err != nil { -// t.Fatal(err) -// } -// -// _, err = cont.Serve() -// require.Error(t, err) -//} -// -//func TestAMQPStats(t *testing.T) { -// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) -// assert.NoError(t, err) -// -// cfg := &config.Viper{ -// Path: "amqp/.rr-amqp-declare.yaml", -// Prefix: "rr", -// } -// -// controller := gomock.NewController(t) -// mockLogger := mocks.NewMockLogger(controller) -// -// // general -// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) -// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() -// -// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) -// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() -// -// err = cont.RegisterAll( -// cfg, -// &server.Plugin{}, -// &rpcPlugin.Plugin{}, -// mockLogger, -// &jobs.Plugin{}, -// &resetter.Plugin{}, -// &informer.Plugin{}, -// &amqp.Plugin{}, -// ) -// assert.NoError(t, err) -// -// err = cont.Init() -// if err != nil { -// t.Fatal(err) -// } -// -// ch, err := cont.Serve() -// if err != nil { -// t.Fatal(err) -// } -// -// sig := make(chan os.Signal, 1) -// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) -// -// wg := &sync.WaitGroup{} -// wg.Add(1) -// -// stopCh := make(chan struct{}, 1) -// -// go func() { -// defer wg.Done() -// for { -// select { -// case e := <-ch: -// assert.Fail(t, "error", e.Error.Error()) -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// case <-sig: -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// case <-stopCh: -// // timeout -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// } -// } -// }() -// -// time.Sleep(time.Second * 3) -// -// t.Run("DeclareAMQPPipeline", declareAMQPPipe) -// t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) -// t.Run("PushAMQPPipeline", pushToPipe("test-3")) -// time.Sleep(time.Second * 2) -// t.Run("PauseAMQPPipeline", pausePipelines("test-3")) -// time.Sleep(time.Second * 2) -// t.Run("PushAMQPPipeline", pushToPipe("test-3")) -// t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) -// -// out := &jobState.State{} -// t.Run("Stats", stats(out)) -// -// assert.Equal(t, out.Pipeline, "test-3") -// assert.Equal(t, out.Driver, "amqp") -// assert.Equal(t, out.Queue, "default") -// -// assert.Equal(t, int64(1), out.Active) -// assert.Equal(t, int64(1), out.Delayed) -// assert.Equal(t, int64(0), out.Reserved) -// assert.Equal(t, false, out.Ready) -// -// time.Sleep(time.Second) -// t.Run("ResumePipeline", resumePipes("test-3")) -// time.Sleep(time.Second * 7) -// -// out = &jobState.State{} -// t.Run("Stats", stats(out)) -// -// assert.Equal(t, out.Pipeline, "test-3") -// assert.Equal(t, out.Driver, "amqp") -// assert.Equal(t, out.Queue, "default") -// -// assert.Equal(t, int64(0), out.Active) -// assert.Equal(t, int64(0), out.Delayed) -// assert.Equal(t, int64(0), out.Reserved) -// assert.Equal(t, true, out.Ready) -// -// time.Sleep(time.Second) -// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) -// -// time.Sleep(time.Second * 5) -// stopCh <- struct{}{} -// wg.Wait() -//} -// +func TestBoltDBJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-jobs-err.yaml", + Prefix: "rr", + } + + //controller := gomock.NewController(t) + //mockLogger := mocks.NewMockLogger(controller) + // + //// general + //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + // + //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + // + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PausePipeline", pausePipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +func TestBoltDBNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} + +func TestBoltDBStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-declare.yaml", + Prefix: "rr", + } + + //controller := gomock.NewController(t) + //mockLogger := mocks.NewMockLogger(controller) + // + //// general + //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + // + //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 2) + t.Run("PushPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, "test-3", out.Pipeline) + assert.Equal(t, "boltdb", out.Driver) + assert.Equal(t, "push", out.Queue) + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, false, out.Ready) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, "test-3", out.Pipeline) + assert.Equal(t, "boltdb", out.Driver) + assert.Equal(t, "push", out.Queue) + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, true, out.Ready) + + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} func declareBoltDBPipe(file string) func(t *testing.T) { return func(t *testing.T) { diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 91855ee9..951d6227 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -171,9 +171,12 @@ func TestJOBSMetrics(t *testing.T) { signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) tt := time.NewTimer(time.Minute * 3) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { defer tt.Stop() + defer wg.Done() for { select { case e := <-ch: @@ -220,7 +223,7 @@ func TestJOBSMetrics(t *testing.T) { assert.Contains(t, genericOut, "workers_memory_bytes") close(sig) - time.Sleep(time.Second * 2) + wg.Wait() } const getAddr = "http://127.0.0.1:2112/metrics" diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 95abe9dc..2dd2c8db 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -437,15 +437,15 @@ func TestSQSStat(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareSQSPipeline", declareSQSPipe) - t.Run("ConsumeSQSPipeline", resumePipes("test-3")) - t.Run("PushSQSPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareSQSPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) - t.Run("PauseSQSPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("PushSQSPipelineDelayed", pushToPipeDelayed("test-3", 5)) - t.Run("PushSQSPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) out := &jobState.State{} @@ -474,7 +474,7 @@ func TestSQSStat(t *testing.T) { assert.Equal(t, int64(0), out.Delayed) assert.Equal(t, int64(0), out.Reserved) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) stopCh <- struct{}{} |