From 3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 25 Aug 2021 18:03:30 +0300 Subject: BoltDB local queue initial commit Signed-off-by: Valery Piashchynski --- tests/plugins/kv/storage_plugin_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'tests/plugins') diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index ced1c5fe..a09a456c 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -12,9 +12,9 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/boltdb" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/memory" -- cgit v1.2.3 From efb3efa98c8555815330274f0618bfc080f4c65c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 26 Aug 2021 18:32:51 +0300 Subject: Move drivers to the plugin's root. Fix #771, add tests. Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/jobs_amqp_test.go | 2 +- tests/plugins/jobs/jobs_beanstalk_test.go | 2 +- tests/plugins/jobs/jobs_ephemeral_test.go | 2 +- tests/plugins/jobs/jobs_general_test.go | 4 ++-- tests/plugins/jobs/jobs_sqs_test.go | 2 +- tests/plugins/jobs/jobs_with_toxics_test.go | 6 +++--- tests/plugins/kv/storage_plugin_test.go | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) (limited to 'tests/plugins') diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 48d6515d..949698ec 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -14,10 +14,10 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index 8e74c7cc..9f4d37ec 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -14,10 +14,10 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/beanstalk" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index 98590a96..2890aa9d 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -15,9 +15,9 @@ import ( goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index f0b5697b..91855ee9 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -12,11 +12,11 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 630a059a..95abe9dc 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -17,11 +17,11 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/sqs" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go index f6521e8d..80fed8eb 100644 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -11,15 +11,15 @@ import ( toxiproxy "github.com/Shopify/toxiproxy/client" "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/amqp" + "github.com/spiral/roadrunner/v2/plugins/beanstalk" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/sqs" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index a09a456c..e757a9e6 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -15,8 +15,8 @@ import ( "github.com/spiral/roadrunner/v2/plugins/boltdb" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memcached" "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" -- cgit v1.2.3 From c23a88a943b53b99d112b63ed121931d1f79436f Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 29 Aug 2021 23:46:11 +0300 Subject: Implement Init, FromPipeline methods Update receiver in the amqp driver Add simple (initial) boltdb tests Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml | 24 + tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml | 43 ++ tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml | 24 + tests/plugins/jobs/boltdb/.rr-no-global.yaml | 47 ++ tests/plugins/jobs/jobs_boltdb_test.go | 504 +++++++++++++++++++++ 5 files changed, 642 insertions(+) create mode 100644 tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml create mode 100644 tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml create mode 100644 tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml create mode 100644 tests/plugins/jobs/boltdb/.rr-no-global.yaml create mode 100644 tests/plugins/jobs/jobs_boltdb_test.go (limited to 'tests/plugins') diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml new file mode 100644 index 00000000..1d558a63 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +boltdb: + permissions: 0666 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml new file mode 100644 index 00000000..03a8d67e --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml @@ -0,0 +1,43 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +boltdb: + permissions: 0666 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + timeout: 1 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: boltdb + prefetch: 100 + file: "rr1.db" + priority: 1 + + test-2: + driver: boltdb + prefetch: 100 + file: "rr2.db" + priority: 2 + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml new file mode 100644 index 00000000..79493d96 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:guest@127.0.0.1:5672/ + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/boltdb/.rr-no-global.yaml b/tests/plugins/jobs/boltdb/.rr-no-global.yaml new file mode 100644 index 00000000..1b01eb73 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-no-global.yaml @@ -0,0 +1,47 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: error + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-1: + driver: amqp + priority: 1 + pipeline_size: 100 + queue: test-1-queue + exchange: default + exchange_type: direct + routing_key: test + + test-2: + driver: amqp + priority: 2 + pipeline_size: 100 + queue: test-2-queue + exchange: default + exchange_type: direct + routing_key: test-2 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go new file mode 100644 index 00000000..cf3e5a91 --- /dev/null +++ b/tests/plugins/jobs/jobs_boltdb_test.go @@ -0,0 +1,504 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/boltdb" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/stretchr/testify/assert" +) + +const ( + rr1db string = "rr1.db" + rr2db string = "rr2.db" +) + +func TestBoltDBInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-init.yaml", + Prefix: "rr", + } + // + //controller := gomock.NewController(t) + //mockLogger := mocks.NewMockLogger(controller) + // + //// general + //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + // + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + // + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() + + assert.NoError(t, os.Remove(rr1db)) + assert.NoError(t, os.Remove(rr2db)) +} + +func TestBoltDBDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-declare.yaml", + Prefix: "rr", + } + + //controller := gomock.NewController(t) + //mockLogger := mocks.NewMockLogger(controller) + // + //// general + //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + // + //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + // + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +// +//func TestAMQPJobsError(t *testing.T) { +// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) +// assert.NoError(t, err) +// +// cfg := &config.Viper{ +// Path: "amqp/.rr-amqp-jobs-err.yaml", +// Prefix: "rr", +// } +// +// controller := gomock.NewController(t) +// mockLogger := mocks.NewMockLogger(controller) +// +// // general +// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() +// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() +// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) +// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() +// +// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// +// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) +// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) +// mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) +// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) +// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) +// +// err = cont.RegisterAll( +// cfg, +// &server.Plugin{}, +// &rpcPlugin.Plugin{}, +// mockLogger, +// &jobs.Plugin{}, +// &resetter.Plugin{}, +// &informer.Plugin{}, +// &amqp.Plugin{}, +// ) +// assert.NoError(t, err) +// +// err = cont.Init() +// if err != nil { +// t.Fatal(err) +// } +// +// ch, err := cont.Serve() +// if err != nil { +// t.Fatal(err) +// } +// +// sig := make(chan os.Signal, 1) +// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) +// +// wg := &sync.WaitGroup{} +// wg.Add(1) +// +// stopCh := make(chan struct{}, 1) +// +// go func() { +// defer wg.Done() +// for { +// select { +// case e := <-ch: +// assert.Fail(t, "error", e.Error.Error()) +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// case <-sig: +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// return +// case <-stopCh: +// // timeout +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// return +// } +// } +// }() +// +// time.Sleep(time.Second * 3) +// +// t.Run("DeclareAMQPPipeline", declareAMQPPipe) +// t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) +// t.Run("PushAMQPPipeline", pushToPipe("test-3")) +// time.Sleep(time.Second * 25) +// t.Run("PauseAMQPPipeline", pausePipelines("test-3")) +// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) +// +// time.Sleep(time.Second * 5) +// stopCh <- struct{}{} +// wg.Wait() +//} +// +//func TestAMQPNoGlobalSection(t *testing.T) { +// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) +// assert.NoError(t, err) +// +// cfg := &config.Viper{ +// Path: "amqp/.rr-no-global.yaml", +// Prefix: "rr", +// } +// +// err = cont.RegisterAll( +// cfg, +// &server.Plugin{}, +// &rpcPlugin.Plugin{}, +// &logger.ZapLogger{}, +// &jobs.Plugin{}, +// &resetter.Plugin{}, +// &informer.Plugin{}, +// &amqp.Plugin{}, +// ) +// assert.NoError(t, err) +// +// err = cont.Init() +// if err != nil { +// t.Fatal(err) +// } +// +// _, err = cont.Serve() +// require.Error(t, err) +//} +// +//func TestAMQPStats(t *testing.T) { +// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) +// assert.NoError(t, err) +// +// cfg := &config.Viper{ +// Path: "amqp/.rr-amqp-declare.yaml", +// Prefix: "rr", +// } +// +// controller := gomock.NewController(t) +// mockLogger := mocks.NewMockLogger(controller) +// +// // general +// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() +// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() +// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) +// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() +// +// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) +// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) +// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) +// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) +// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() +// +// err = cont.RegisterAll( +// cfg, +// &server.Plugin{}, +// &rpcPlugin.Plugin{}, +// mockLogger, +// &jobs.Plugin{}, +// &resetter.Plugin{}, +// &informer.Plugin{}, +// &amqp.Plugin{}, +// ) +// assert.NoError(t, err) +// +// err = cont.Init() +// if err != nil { +// t.Fatal(err) +// } +// +// ch, err := cont.Serve() +// if err != nil { +// t.Fatal(err) +// } +// +// sig := make(chan os.Signal, 1) +// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) +// +// wg := &sync.WaitGroup{} +// wg.Add(1) +// +// stopCh := make(chan struct{}, 1) +// +// go func() { +// defer wg.Done() +// for { +// select { +// case e := <-ch: +// assert.Fail(t, "error", e.Error.Error()) +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// case <-sig: +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// return +// case <-stopCh: +// // timeout +// err = cont.Stop() +// if err != nil { +// assert.FailNow(t, "error", err.Error()) +// } +// return +// } +// } +// }() +// +// time.Sleep(time.Second * 3) +// +// t.Run("DeclareAMQPPipeline", declareAMQPPipe) +// t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) +// t.Run("PushAMQPPipeline", pushToPipe("test-3")) +// time.Sleep(time.Second * 2) +// t.Run("PauseAMQPPipeline", pausePipelines("test-3")) +// time.Sleep(time.Second * 2) +// t.Run("PushAMQPPipeline", pushToPipe("test-3")) +// t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) +// +// out := &jobState.State{} +// t.Run("Stats", stats(out)) +// +// assert.Equal(t, out.Pipeline, "test-3") +// assert.Equal(t, out.Driver, "amqp") +// assert.Equal(t, out.Queue, "default") +// +// assert.Equal(t, int64(1), out.Active) +// assert.Equal(t, int64(1), out.Delayed) +// assert.Equal(t, int64(0), out.Reserved) +// assert.Equal(t, false, out.Ready) +// +// time.Sleep(time.Second) +// t.Run("ResumePipeline", resumePipes("test-3")) +// time.Sleep(time.Second * 7) +// +// out = &jobState.State{} +// t.Run("Stats", stats(out)) +// +// assert.Equal(t, out.Pipeline, "test-3") +// assert.Equal(t, out.Driver, "amqp") +// assert.Equal(t, out.Queue, "default") +// +// assert.Equal(t, int64(0), out.Active) +// assert.Equal(t, int64(0), out.Delayed) +// assert.Equal(t, int64(0), out.Reserved) +// assert.Equal(t, true, out.Ready) +// +// time.Sleep(time.Second) +// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) +// +// time.Sleep(time.Second * 5) +// stopCh <- struct{}{} +// wg.Wait() +//} +// + +func declareBoltDBPipe(file string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "boltdb", + "name": "test-3", + "prefetch": "100", + "priority": "3", + "file": file, + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) + } +} -- cgit v1.2.3 From f6070d04558ce2e06a114ec2d9a8557d6f88d89b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 29 Aug 2021 23:48:08 +0300 Subject: Update boltdb config files Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml | 2 +- tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml | 2 +- tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml | 4 ++-- tests/plugins/jobs/boltdb/.rr-no-global.yaml | 22 ++++++++-------------- 4 files changed, 12 insertions(+), 18 deletions(-) (limited to 'tests/plugins') diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml index 1d558a63..cdc2655f 100644 --- a/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml @@ -7,7 +7,7 @@ server: relay_timeout: "20s" boltdb: - permissions: 0666 + permissions: 0777 logs: level: debug diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml index 03a8d67e..804db543 100644 --- a/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml @@ -7,7 +7,7 @@ server: relay_timeout: "20s" boltdb: - permissions: 0666 + permissions: 0777 logs: level: debug diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml index 79493d96..d375a9a5 100644 --- a/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml @@ -6,8 +6,8 @@ server: relay: "pipes" relay_timeout: "20s" -amqp: - addr: amqp://guest:guest@127.0.0.1:5672/ +boltdb: + permissions: 0777 logs: level: debug diff --git a/tests/plugins/jobs/boltdb/.rr-no-global.yaml b/tests/plugins/jobs/boltdb/.rr-no-global.yaml index 1b01eb73..1c09bef9 100644 --- a/tests/plugins/jobs/boltdb/.rr-no-global.yaml +++ b/tests/plugins/jobs/boltdb/.rr-no-global.yaml @@ -25,22 +25,16 @@ jobs: # list of broker pipelines associated with endpoints pipelines: test-1: - driver: amqp - priority: 1 - pipeline_size: 100 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test + driver: boltdb + prefetch: 100 + file: "rr1.db" + priority: 1 test-2: - driver: amqp - priority: 2 - pipeline_size: 100 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 + driver: boltdb + prefetch: 100 + file: "rr2.db" + priority: 1 # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually consume: [ "test-1", "test-2" ] -- cgit v1.2.3 From c7d9385f135853539100430521042f7e7e2ae005 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 30 Aug 2021 21:32:50 +0300 Subject: Tests for the boltdb jobs. Fix issue with Stop in the jobs plugin which didn't destroy the pool. Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/beanstalk/.rr-no-global.yaml | 3 + tests/plugins/jobs/boltdb/.rr-no-global.yaml | 12 +- tests/plugins/jobs/helpers.go | 2 +- tests/plugins/jobs/jobs_beanstalk_test.go | 2 +- tests/plugins/jobs/jobs_boltdb_test.go | 524 ++++++++++++------------ tests/plugins/jobs/jobs_general_test.go | 5 +- tests/plugins/jobs/jobs_sqs_test.go | 14 +- 7 files changed, 286 insertions(+), 276 deletions(-) (limited to 'tests/plugins') diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml index 87f46069..92d090d4 100644 --- a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml @@ -29,3 +29,6 @@ jobs: reserve_timeout: 10s consume: [ "test-1" ] + +endure: + log_level: debug diff --git a/tests/plugins/jobs/boltdb/.rr-no-global.yaml b/tests/plugins/jobs/boltdb/.rr-no-global.yaml index 1c09bef9..54aaf3c6 100644 --- a/tests/plugins/jobs/boltdb/.rr-no-global.yaml +++ b/tests/plugins/jobs/boltdb/.rr-no-global.yaml @@ -26,15 +26,15 @@ jobs: pipelines: test-1: driver: boltdb - prefetch: 100 - file: "rr1.db" - priority: 1 + prefetch: 100 + file: "rr1.db" + priority: 1 test-2: driver: boltdb - prefetch: 100 - file: "rr2.db" - priority: 1 + prefetch: 100 + file: "rr2.db" + priority: 1 # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually consume: [ "test-1", "test-2" ] diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index 5067ef9f..6c2d05ca 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -95,7 +95,7 @@ func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) { req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ Job: "some/php/namespace", - Id: "1", + Id: "2", Payload: `{"hello":"world"}`, Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, Options: &jobsv1beta.Options{ diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index 9f4d37ec..78d154b1 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -466,7 +466,7 @@ func TestBeanstalkStats(t *testing.T) { } func TestBeanstalkNoGlobalSection(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) assert.NoError(t, err) cfg := &config.Viper{ diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go index cf3e5a91..15d2bce8 100644 --- a/tests/plugins/jobs/jobs_boltdb_test.go +++ b/tests/plugins/jobs/jobs_boltdb_test.go @@ -12,6 +12,7 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/boltdb" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" @@ -22,6 +23,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -222,266 +224,268 @@ func TestBoltDBDeclare(t *testing.T) { assert.NoError(t, os.Remove(rr1db)) } -// -//func TestAMQPJobsError(t *testing.T) { -// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) -// assert.NoError(t, err) -// -// cfg := &config.Viper{ -// Path: "amqp/.rr-amqp-jobs-err.yaml", -// Prefix: "rr", -// } -// -// controller := gomock.NewController(t) -// mockLogger := mocks.NewMockLogger(controller) -// -// // general -// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) -// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() -// -// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// -// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) -// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) -// -// err = cont.RegisterAll( -// cfg, -// &server.Plugin{}, -// &rpcPlugin.Plugin{}, -// mockLogger, -// &jobs.Plugin{}, -// &resetter.Plugin{}, -// &informer.Plugin{}, -// &amqp.Plugin{}, -// ) -// assert.NoError(t, err) -// -// err = cont.Init() -// if err != nil { -// t.Fatal(err) -// } -// -// ch, err := cont.Serve() -// if err != nil { -// t.Fatal(err) -// } -// -// sig := make(chan os.Signal, 1) -// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) -// -// wg := &sync.WaitGroup{} -// wg.Add(1) -// -// stopCh := make(chan struct{}, 1) -// -// go func() { -// defer wg.Done() -// for { -// select { -// case e := <-ch: -// assert.Fail(t, "error", e.Error.Error()) -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// case <-sig: -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// case <-stopCh: -// // timeout -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// } -// } -// }() -// -// time.Sleep(time.Second * 3) -// -// t.Run("DeclareAMQPPipeline", declareAMQPPipe) -// t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) -// t.Run("PushAMQPPipeline", pushToPipe("test-3")) -// time.Sleep(time.Second * 25) -// t.Run("PauseAMQPPipeline", pausePipelines("test-3")) -// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) -// -// time.Sleep(time.Second * 5) -// stopCh <- struct{}{} -// wg.Wait() -//} -// -//func TestAMQPNoGlobalSection(t *testing.T) { -// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) -// assert.NoError(t, err) -// -// cfg := &config.Viper{ -// Path: "amqp/.rr-no-global.yaml", -// Prefix: "rr", -// } -// -// err = cont.RegisterAll( -// cfg, -// &server.Plugin{}, -// &rpcPlugin.Plugin{}, -// &logger.ZapLogger{}, -// &jobs.Plugin{}, -// &resetter.Plugin{}, -// &informer.Plugin{}, -// &amqp.Plugin{}, -// ) -// assert.NoError(t, err) -// -// err = cont.Init() -// if err != nil { -// t.Fatal(err) -// } -// -// _, err = cont.Serve() -// require.Error(t, err) -//} -// -//func TestAMQPStats(t *testing.T) { -// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) -// assert.NoError(t, err) -// -// cfg := &config.Viper{ -// Path: "amqp/.rr-amqp-declare.yaml", -// Prefix: "rr", -// } -// -// controller := gomock.NewController(t) -// mockLogger := mocks.NewMockLogger(controller) -// -// // general -// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() -// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) -// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() -// -// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) -// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) -// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) -// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() -// -// err = cont.RegisterAll( -// cfg, -// &server.Plugin{}, -// &rpcPlugin.Plugin{}, -// mockLogger, -// &jobs.Plugin{}, -// &resetter.Plugin{}, -// &informer.Plugin{}, -// &amqp.Plugin{}, -// ) -// assert.NoError(t, err) -// -// err = cont.Init() -// if err != nil { -// t.Fatal(err) -// } -// -// ch, err := cont.Serve() -// if err != nil { -// t.Fatal(err) -// } -// -// sig := make(chan os.Signal, 1) -// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) -// -// wg := &sync.WaitGroup{} -// wg.Add(1) -// -// stopCh := make(chan struct{}, 1) -// -// go func() { -// defer wg.Done() -// for { -// select { -// case e := <-ch: -// assert.Fail(t, "error", e.Error.Error()) -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// case <-sig: -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// case <-stopCh: -// // timeout -// err = cont.Stop() -// if err != nil { -// assert.FailNow(t, "error", err.Error()) -// } -// return -// } -// } -// }() -// -// time.Sleep(time.Second * 3) -// -// t.Run("DeclareAMQPPipeline", declareAMQPPipe) -// t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) -// t.Run("PushAMQPPipeline", pushToPipe("test-3")) -// time.Sleep(time.Second * 2) -// t.Run("PauseAMQPPipeline", pausePipelines("test-3")) -// time.Sleep(time.Second * 2) -// t.Run("PushAMQPPipeline", pushToPipe("test-3")) -// t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) -// -// out := &jobState.State{} -// t.Run("Stats", stats(out)) -// -// assert.Equal(t, out.Pipeline, "test-3") -// assert.Equal(t, out.Driver, "amqp") -// assert.Equal(t, out.Queue, "default") -// -// assert.Equal(t, int64(1), out.Active) -// assert.Equal(t, int64(1), out.Delayed) -// assert.Equal(t, int64(0), out.Reserved) -// assert.Equal(t, false, out.Ready) -// -// time.Sleep(time.Second) -// t.Run("ResumePipeline", resumePipes("test-3")) -// time.Sleep(time.Second * 7) -// -// out = &jobState.State{} -// t.Run("Stats", stats(out)) -// -// assert.Equal(t, out.Pipeline, "test-3") -// assert.Equal(t, out.Driver, "amqp") -// assert.Equal(t, out.Queue, "default") -// -// assert.Equal(t, int64(0), out.Active) -// assert.Equal(t, int64(0), out.Delayed) -// assert.Equal(t, int64(0), out.Reserved) -// assert.Equal(t, true, out.Ready) -// -// time.Sleep(time.Second) -// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) -// -// time.Sleep(time.Second * 5) -// stopCh <- struct{}{} -// wg.Wait() -//} -// +func TestBoltDBJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-jobs-err.yaml", + Prefix: "rr", + } + + //controller := gomock.NewController(t) + //mockLogger := mocks.NewMockLogger(controller) + // + //// general + //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + // + //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + // + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PausePipeline", pausePipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +func TestBoltDBNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} + +func TestBoltDBStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-declare.yaml", + Prefix: "rr", + } + + //controller := gomock.NewController(t) + //mockLogger := mocks.NewMockLogger(controller) + // + //// general + //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + // + //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + //mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 2) + t.Run("PushPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, "test-3", out.Pipeline) + assert.Equal(t, "boltdb", out.Driver) + assert.Equal(t, "push", out.Queue) + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, false, out.Ready) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, "test-3", out.Pipeline) + assert.Equal(t, "boltdb", out.Driver) + assert.Equal(t, "push", out.Queue) + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, true, out.Ready) + + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} func declareBoltDBPipe(file string) func(t *testing.T) { return func(t *testing.T) { diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 91855ee9..951d6227 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -171,9 +171,12 @@ func TestJOBSMetrics(t *testing.T) { signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) tt := time.NewTimer(time.Minute * 3) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { defer tt.Stop() + defer wg.Done() for { select { case e := <-ch: @@ -220,7 +223,7 @@ func TestJOBSMetrics(t *testing.T) { assert.Contains(t, genericOut, "workers_memory_bytes") close(sig) - time.Sleep(time.Second * 2) + wg.Wait() } const getAddr = "http://127.0.0.1:2112/metrics" diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 95abe9dc..2dd2c8db 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -437,15 +437,15 @@ func TestSQSStat(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareSQSPipeline", declareSQSPipe) - t.Run("ConsumeSQSPipeline", resumePipes("test-3")) - t.Run("PushSQSPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareSQSPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) - t.Run("PauseSQSPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("PushSQSPipelineDelayed", pushToPipeDelayed("test-3", 5)) - t.Run("PushSQSPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) out := &jobState.State{} @@ -474,7 +474,7 @@ func TestSQSStat(t *testing.T) { assert.Equal(t, int64(0), out.Delayed) assert.Equal(t, int64(0), out.Reserved) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) stopCh <- struct{}{} -- cgit v1.2.3 From 0c10cb989fa1deae3996df272f40e2270a880b52 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 31 Aug 2021 11:54:24 +0300 Subject: Finish boltdb tests Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/jobs_beanstalk_test.go | 2 +- tests/plugins/jobs/jobs_boltdb_test.go | 150 +++++++++++++++--------------- 2 files changed, 75 insertions(+), 77 deletions(-) (limited to 'tests/plugins') diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index 78d154b1..9f4d37ec 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -466,7 +466,7 @@ func TestBeanstalkStats(t *testing.T) { } func TestBeanstalkNoGlobalSection(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go index 15d2bce8..ab36ffa4 100644 --- a/tests/plugins/jobs/jobs_boltdb_test.go +++ b/tests/plugins/jobs/jobs_boltdb_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" @@ -22,6 +23,7 @@ import ( rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -39,30 +41,29 @@ func TestBoltDBInit(t *testing.T) { Path: "boltdb/.rr-boltdb-init.yaml", Prefix: "rr", } - // - //controller := gomock.NewController(t) - //mockLogger := mocks.NewMockLogger(controller) - // - //// general - //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - // - //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - // - //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("boltdb listener stopped").Times(4) err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - //mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -132,30 +133,29 @@ func TestBoltDBDeclare(t *testing.T) { Prefix: "rr", } - //controller := gomock.NewController(t) - //mockLogger := mocks.NewMockLogger(controller) - // - //// general - //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - // - //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - // - //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").Times(2) err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - //mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -233,31 +233,30 @@ func TestBoltDBJobsError(t *testing.T) { Prefix: "rr", } - //controller := gomock.NewController(t) - //mockLogger := mocks.NewMockLogger(controller) - // - //// general - //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - // - //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - // - //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) - //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").Times(2) err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - //mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -364,29 +363,28 @@ func TestBoltDBStats(t *testing.T) { Prefix: "rr", } - //controller := gomock.NewController(t) - //mockLogger := mocks.NewMockLogger(controller) - // - //// general - //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - // - //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").AnyTimes() err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - //mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, -- cgit v1.2.3