diff options
author | Valery Piashchynski <[email protected]> | 2021-08-30 21:32:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-30 21:32:50 +0300 |
commit | c7d9385f135853539100430521042f7e7e2ae005 (patch) | |
tree | 588f45f6cfcd716bb3197ebff8cfdbc86a984afc /tests/plugins/jobs/jobs_boltdb_test.go | |
parent | f6070d04558ce2e06a114ec2d9a8557d6f88d89b (diff) |
Tests for the boltdb jobs.
Fix issue with Stop in the jobs plugin which didn't destroy the pool.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests/plugins/jobs/jobs_boltdb_test.go')
-rw-r--r-- | tests/plugins/jobs/jobs_boltdb_test.go | 524 |
1 files changed, 264 insertions, 260 deletions
diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go index cf3e5a91..15d2bce8 100644 --- a/tests/plugins/jobs/jobs_boltdb_test.go +++ b/tests/plugins/jobs/jobs_boltdb_test.go @@ -12,6 +12,7 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/boltdb" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" @@ -22,6 +23,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -222,266 +224,268 @@ func TestBoltDBDeclare(t *testing.T) { assert.NoError(t, os.Remove(rr1db)) } -// -//func TestAMQPJobsError(t *testing.T) { -// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) -// assert.NoError(t, err) -// -// cfg := &config.Viper{ -// Path: "amqp/.rr-amqp-jobs-err.yaml", -// Prefix: "rr", -// } -// -// controller := gomock.NewController(t) -// mockLogger := mocks.NewMockLogger(controller) -// -// // general -// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) -// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() -// -// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// -// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) -// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) -// -// err = cont.RegisterAll( -// cfg, -// &server.Plugin{}, -// &rpcPlugin.Plugin{}, -// mockLogger, -// &jobs.Plugin{}, -// &resetter.Plugin{}, -// &informer.Plugin{}, -// &amqp.Plugin{}, -// ) -// assert.NoError(t, err) -// -// err = cont.Init() -// if err != nil { -// t.Fatal(err) -// } -// -// ch, err := cont.Serve() -// if err != nil { -// t.Fatal(err) -// } -// -// sig := make(chan os.Signal, 1) -// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) -// -// wg := &sync.WaitGroup{} -// wg.Add(1) -// -// stopCh := make(chan struct{}, 1) -// -// go func() { -// defer wg.Done() -// for { -// select { -// case e := <-ch: -// assert.Fail(t, "error", e.Error.Error()) -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// case <-sig: -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// case <-stopCh: -// // timeout -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// } -// } -// }() -// -// time.Sleep(time.Second * 3) -// -// t.Run("DeclareAMQPPipeline", declareAMQPPipe) -// t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) -// t.Run("PushAMQPPipeline", pushToPipe("test-3")) -// time.Sleep(time.Second * 25) -// t.Run("PauseAMQPPipeline", pausePipelines("test-3")) -// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) -// -// time.Sleep(time.Second * 5) -// stopCh <- struct{}{} -// wg.Wait() -//} -// -//func TestAMQPNoGlobalSection(t *testing.T) { -// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) -// assert.NoError(t, err) -// -// cfg := &config.Viper{ -// Path: "amqp/.rr-no-global.yaml", -// Prefix: "rr", -// } -// -// err = cont.RegisterAll( -// cfg, -// &server.Plugin{}, -// &rpcPlugin.Plugin{}, -// &logger.ZapLogger{}, -// &jobs.Plugin{}, -// &resetter.Plugin{}, -// &informer.Plugin{}, -// &amqp.Plugin{}, -// ) -// assert.NoError(t, err) -// -// err = cont.Init() -// if err != nil { -// t.Fatal(err) -// } -// -// _, err = cont.Serve() -// require.Error(t, err) -//} -// -//func TestAMQPStats(t *testing.T) { -// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) -// assert.NoError(t, err) -// -// cfg := &config.Viper{ -// Path: "amqp/.rr-amqp-declare.yaml", -// Prefix: "rr", -// } -// -// controller := gomock.NewController(t) -// mockLogger := mocks.NewMockLogger(controller) -// -// // general -// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) -// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() -// -// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) -// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() -// -// err = cont.RegisterAll( -// cfg, -// &server.Plugin{}, -// &rpcPlugin.Plugin{}, -// mockLogger, -// &jobs.Plugin{}, -// &resetter.Plugin{}, -// &informer.Plugin{}, -// &amqp.Plugin{}, -// ) -// assert.NoError(t, err) -// -// err = cont.Init() -// if err != nil { -// t.Fatal(err) -// } -// -// ch, err := cont.Serve() -// if err != nil { -// t.Fatal(err) -// } -// -// sig := make(chan os.Signal, 1) -// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) -// -// wg := &sync.WaitGroup{} -// wg.Add(1) -// -// stopCh := make(chan struct{}, 1) -// -// go func() { -// defer wg.Done() -// for { -// select { -// case e := <-ch: -// assert.Fail(t, "error", e.Error.Error()) -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// case <-sig: -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// case <-stopCh: -// // timeout -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// } -// } -// }() -// -// time.Sleep(time.Second * 3) -// -// t.Run("DeclareAMQPPipeline", declareAMQPPipe) -// t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) -// t.Run("PushAMQPPipeline", pushToPipe("test-3")) -// time.Sleep(time.Second * 2) -// t.Run("PauseAMQPPipeline", pausePipelines("test-3")) -// time.Sleep(time.Second * 2) -// t.Run("PushAMQPPipeline", pushToPipe("test-3")) -// t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) -// -// out := &jobState.State{} -// t.Run("Stats", stats(out)) -// -// assert.Equal(t, out.Pipeline, "test-3") -// assert.Equal(t, out.Driver, "amqp") -// assert.Equal(t, out.Queue, "default") -// -// assert.Equal(t, int64(1), out.Active) -// assert.Equal(t, int64(1), out.Delayed) -// assert.Equal(t, int64(0), out.Reserved) -// assert.Equal(t, false, out.Ready) -// -// time.Sleep(time.Second) -// t.Run("ResumePipeline", resumePipes("test-3")) -// time.Sleep(time.Second * 7) -// -// out = &jobState.State{} -// t.Run("Stats", stats(out)) -// -// assert.Equal(t, out.Pipeline, "test-3") -// assert.Equal(t, out.Driver, "amqp") -// assert.Equal(t, out.Queue, "default") -// -// assert.Equal(t, int64(0), out.Active) -// assert.Equal(t, int64(0), out.Delayed) -// assert.Equal(t, int64(0), out.Reserved) -// assert.Equal(t, true, out.Ready) -// -// time.Sleep(time.Second) -// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) -// -// time.Sleep(time.Second * 5) -// stopCh <- struct{}{} -// wg.Wait() -//} -// +func TestBoltDBJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-jobs-err.yaml", + Prefix: "rr", + } + + //controller := gomock.NewController(t) + //mockLogger := mocks.NewMockLogger(controller) + // + //// general + //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + // + //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + // + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PausePipeline", pausePipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +func TestBoltDBNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} + +func TestBoltDBStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-declare.yaml", + Prefix: "rr", + } + + //controller := gomock.NewController(t) + //mockLogger := mocks.NewMockLogger(controller) + // + //// general + //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + // + //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 2) + t.Run("PushPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, "test-3", out.Pipeline) + assert.Equal(t, "boltdb", out.Driver) + assert.Equal(t, "push", out.Queue) + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, false, out.Ready) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, "test-3", out.Pipeline) + assert.Equal(t, "boltdb", out.Driver) + assert.Equal(t, "push", out.Queue) + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, true, out.Ready) + + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} func declareBoltDBPipe(file string) func(t *testing.T) { return func(t *testing.T) { |