From 9079cc51599d6e21ac59e34573f7bbf2e2e87b9e Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 23 Jul 2021 13:25:09 +0300 Subject: Tests refactoring Signed-off-by: Valery Piashchynski --- tests/plugins/jobs/amqp/.rr-no-global.yaml | 47 ++++ .../jobs/configs/.rr-jobs-init-no-amqp-global.yaml | 75 ------ tests/plugins/jobs/configs/.rr-jobs-list.yaml | 91 ------- .../jobs/configs/.rr-jobs-pause-resume-all.yaml | 78 ------ .../jobs/configs/.rr-jobs-pause-resume-amqp.yaml | 78 ------ .../configs/.rr-jobs-pause-resume-ephemeral.yaml | 44 ---- .../jobs/ephemeral/.rr-ephemeral-pause-resume.yaml | 44 ++++ tests/plugins/jobs/jobs_amqp_test.go | 31 +++ tests/plugins/jobs/jobs_ephemeral_test.go | 103 ++++++++ tests/plugins/jobs/jobs_general_test.go | 125 +++++++++ tests/plugins/jobs/jobs_plugin_test.go | 290 --------------------- 11 files changed, 350 insertions(+), 656 deletions(-) create mode 100644 tests/plugins/jobs/amqp/.rr-no-global.yaml delete mode 100644 tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml delete mode 100644 tests/plugins/jobs/configs/.rr-jobs-list.yaml delete mode 100644 tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml delete mode 100644 tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml delete mode 100644 tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml create mode 100644 tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml create mode 100644 tests/plugins/jobs/jobs_general_test.go delete mode 100644 tests/plugins/jobs/jobs_plugin_test.go diff --git a/tests/plugins/jobs/amqp/.rr-no-global.yaml b/tests/plugins/jobs/amqp/.rr-no-global.yaml new file mode 100644 index 00000000..1b01eb73 --- /dev/null +++ b/tests/plugins/jobs/amqp/.rr-no-global.yaml @@ -0,0 +1,47 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: error + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-1: + driver: amqp + priority: 1 + pipeline_size: 100 + queue: test-1-queue + exchange: default + exchange_type: direct + routing_key: test + + test-2: + driver: amqp + priority: 2 + pipeline_size: 100 + queue: test-2-queue + exchange: default + exchange_type: direct + routing_key: test-2 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml b/tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml deleted file mode 100644 index 0d141b2b..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml +++ /dev/null @@ -1,75 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: error - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - test-1: - driver: amqp - priority: 1 - pipeline_size: 1000000 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test - - test-2-amqp: - driver: amqp - priority: 2 - pipeline_size: 100000 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 - - test-2: - driver: beanstalk - priority: 11 - tube: default - pipeline_size: 1000000 - - test-3: - # priority: 11 - not defined, 10 by default - # driver locality not specified, local by default - driver: sqs - pipeline_size: 1000000 - queue: default - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ] - diff --git a/tests/plugins/jobs/configs/.rr-jobs-list.yaml b/tests/plugins/jobs/configs/.rr-jobs-list.yaml deleted file mode 100644 index 3d22a098..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-list.yaml +++ /dev/null @@ -1,91 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:guest@localhost:5672/ - - # beanstalk configuration -beanstalk: - addr: tcp://localhost:11300 - - # amazon sqs configuration -sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://localhost:9324 - declare: - MessageRetentionPeriod: 86400 - -logs: - level: debug - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - test-1: - driver: amqp - priority: 1 - pipeline_size: 1000000 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test - - test-2-amqp: - driver: amqp - priority: 2 - pipeline_size: 100000 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 - - test-2: - driver: beanstalk - priority: 11 - tube: default - pipeline_size: 1000000 - - test-3: - # priority: 11 - not defined, 10 by default - # driver locality not specified, local by default - driver: sqs - pipeline_size: 1000000 - queue: default - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ] - diff --git a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml deleted file mode 100644 index 8789e872..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml +++ /dev/null @@ -1,78 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:guest@localhost:5672/ - -logs: - level: debug - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - test-1: - driver: amqp - priority: 1 - pipeline_size: 1000000 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test - - test-2-amqp: - driver: amqp - priority: 2 - pipeline_size: 100000 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 - - test-2: - driver: beanstalk - priority: 11 - tube: default - pipeline_size: 1000000 - - test-3: - # priority: 11 - not defined, 10 by default - # driver locality not specified, local by default - driver: sqs - pipeline_size: 1000000 - queue: default - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ] - diff --git a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml deleted file mode 100644 index 8789e872..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml +++ /dev/null @@ -1,78 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:guest@localhost:5672/ - -logs: - level: debug - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - test-1: - driver: amqp - priority: 1 - pipeline_size: 1000000 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test - - test-2-amqp: - driver: amqp - priority: 2 - pipeline_size: 100000 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 - - test-2: - driver: beanstalk - priority: 11 - tube: default - pipeline_size: 1000000 - - test-3: - # priority: 11 - not defined, 10 by default - # driver locality not specified, local by default - driver: sqs - pipeline_size: 1000000 - queue: default - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ] - diff --git a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml deleted file mode 100644 index dc5bc3a1..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml +++ /dev/null @@ -1,44 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: info - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2" ] - diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml new file mode 100644 index 00000000..dc5bc3a1 --- /dev/null +++ b/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml @@ -0,0 +1,44 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: info + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-local: + driver: ephemeral + priority: 10 + pipeline_size: 10000 + + test-local-2: + driver: ephemeral + priority: 1 + pipeline_size: 10000 + + test-local-3: + driver: ephemeral + priority: 2 + pipeline_size: 10000 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-local", "test-local-2" ] + diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 8f87072b..cbbf43d8 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -24,6 +24,7 @@ import ( jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAMQPInit(t *testing.T) { @@ -239,3 +240,33 @@ func declareAMQPPipe(t *testing.T) { err = client.Call("jobs.Declare", pipe, er) assert.NoError(t, err) } + +func TestAMQPNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index 37e25970..04d95506 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -244,3 +244,106 @@ func consumeEphemeralPipe(t *testing.T) { err = client.Call("jobs.Resume", pipe, er) assert.NoError(t, err) } + +func TestEphemeralPauseResume(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-pause-resume.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + // TODO delete + mockLogger.EXPECT().Debug("request", "body:", gomock.Any(), "context:", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("response", "body:", gomock.Any(), "context:", gomock.Any()).AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("ephemeralPause", pausePipelines("test-local")) + t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) + t.Run("ephemeralResume", resumePipes("test-local")) + t.Run("pushToEnabledPipe", pushToPipe("test-local")) + + time.Sleep(time.Second * 1) + + stopCh <- struct{}{} + wg.Wait() +} diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go new file mode 100644 index 00000000..829fd102 --- /dev/null +++ b/tests/plugins/jobs/jobs_general_test.go @@ -0,0 +1,125 @@ +package jobs + +import ( + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestJobsInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-jobs-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("driver ready", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver ready", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go deleted file mode 100644 index 54015f03..00000000 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ /dev/null @@ -1,290 +0,0 @@ -package jobs - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestJobsInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-jobs-init.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("driver ready", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("driver ready", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - &amqp.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - stopCh <- struct{}{} - wg.Wait() -} - -func TestJobsNoAMQPGlobal(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-jobs-init-no-amqp-global.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - &amqp.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - _, err = cont.Serve() - require.Error(t, err) -} - -func TestJobsPauseResume(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-jobs-pause-resume-ephemeral.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - // TODO delete - mockLogger.EXPECT().Debug("request", "body:", gomock.Any(), "context:", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("response", "body:", gomock.Any(), "context:", gomock.Any()).AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("ephemeralPause", ephemeralPause) - t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) - t.Run("ephemeralResume", ephemeralResume) - t.Run("pushToEnabledPipe", pushToPipe("test-local")) - - time.Sleep(time.Second * 1) - - stopCh <- struct{}{} - wg.Wait() -} - -func ephemeralPause(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} - pipe.GetPipelines()[0] = "test-local" - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Pause", pipe, er) - assert.NoError(t, err) -} - -func ephemeralResume(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} - pipe.GetPipelines()[0] = "test-local" - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Resume", pipe, er) - assert.NoError(t, err) -} -- cgit v1.2.3