diff options
Diffstat (limited to 'tests/plugins')
-rw-r--r-- | tests/plugins/jobs/amqp/.rr-no-global.yaml (renamed from tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml) | 36 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-list.yaml | 91 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml | 78 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml | 78 | ||||
-rw-r--r-- | tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml (renamed from tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml) | 0 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_amqp_test.go | 31 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_ephemeral_test.go | 103 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_general_test.go | 125 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_plugin_test.go | 290 |
9 files changed, 263 insertions, 569 deletions
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml b/tests/plugins/jobs/amqp/.rr-no-global.yaml index 0d141b2b..1b01eb73 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml +++ b/tests/plugins/jobs/amqp/.rr-no-global.yaml @@ -24,52 +24,24 @@ jobs: # list of broker pipelines associated with endpoints pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - test-1: driver: amqp priority: 1 - pipeline_size: 1000000 + pipeline_size: 100 queue: test-1-queue exchange: default exchange_type: direct routing_key: test - test-2-amqp: + test-2: driver: amqp priority: 2 - pipeline_size: 100000 + pipeline_size: 100 queue: test-2-queue exchange: default exchange_type: direct routing_key: test-2 - test-2: - driver: beanstalk - priority: 11 - tube: default - pipeline_size: 1000000 - - test-3: - # priority: 11 - not defined, 10 by default - # driver locality not specified, local by default - driver: sqs - pipeline_size: 1000000 - queue: default - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ] + consume: [ "test-1", "test-2" ] diff --git a/tests/plugins/jobs/configs/.rr-jobs-list.yaml b/tests/plugins/jobs/configs/.rr-jobs-list.yaml deleted file mode 100644 index 3d22a098..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-list.yaml +++ /dev/null @@ -1,91 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:guest@localhost:5672/ - - # beanstalk configuration -beanstalk: - addr: tcp://localhost:11300 - - # amazon sqs configuration -sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://localhost:9324 - declare: - MessageRetentionPeriod: 86400 - -logs: - level: debug - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - test-1: - driver: amqp - priority: 1 - pipeline_size: 1000000 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test - - test-2-amqp: - driver: amqp - priority: 2 - pipeline_size: 100000 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 - - test-2: - driver: beanstalk - priority: 11 - tube: default - pipeline_size: 1000000 - - test-3: - # priority: 11 - not defined, 10 by default - # driver locality not specified, local by default - driver: sqs - pipeline_size: 1000000 - queue: default - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ] - diff --git a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml deleted file mode 100644 index 8789e872..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml +++ /dev/null @@ -1,78 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:guest@localhost:5672/ - -logs: - level: debug - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - test-1: - driver: amqp - priority: 1 - pipeline_size: 1000000 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test - - test-2-amqp: - driver: amqp - priority: 2 - pipeline_size: 100000 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 - - test-2: - driver: beanstalk - priority: 11 - tube: default - pipeline_size: 1000000 - - test-3: - # priority: 11 - not defined, 10 by default - # driver locality not specified, local by default - driver: sqs - pipeline_size: 1000000 - queue: default - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ] - diff --git a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml deleted file mode 100644 index 8789e872..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml +++ /dev/null @@ -1,78 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:guest@localhost:5672/ - -logs: - level: debug - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - test-1: - driver: amqp - priority: 1 - pipeline_size: 1000000 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test - - test-2-amqp: - driver: amqp - priority: 2 - pipeline_size: 100000 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 - - test-2: - driver: beanstalk - priority: 11 - tube: default - pipeline_size: 1000000 - - test-3: - # priority: 11 - not defined, 10 by default - # driver locality not specified, local by default - driver: sqs - pipeline_size: 1000000 - queue: default - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ] - diff --git a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml index dc5bc3a1..dc5bc3a1 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml +++ b/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 8f87072b..cbbf43d8 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -24,6 +24,7 @@ import ( jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAMQPInit(t *testing.T) { @@ -239,3 +240,33 @@ func declareAMQPPipe(t *testing.T) { err = client.Call("jobs.Declare", pipe, er) assert.NoError(t, err) } + +func TestAMQPNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index 37e25970..04d95506 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -244,3 +244,106 @@ func consumeEphemeralPipe(t *testing.T) { err = client.Call("jobs.Resume", pipe, er) assert.NoError(t, err) } + +func TestEphemeralPauseResume(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-pause-resume.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + // TODO delete + mockLogger.EXPECT().Debug("request", "body:", gomock.Any(), "context:", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("response", "body:", gomock.Any(), "context:", gomock.Any()).AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("ephemeralPause", pausePipelines("test-local")) + t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) + t.Run("ephemeralResume", resumePipes("test-local")) + t.Run("pushToEnabledPipe", pushToPipe("test-local")) + + time.Sleep(time.Second * 1) + + stopCh <- struct{}{} + wg.Wait() +} diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go new file mode 100644 index 00000000..829fd102 --- /dev/null +++ b/tests/plugins/jobs/jobs_general_test.go @@ -0,0 +1,125 @@ +package jobs + +import ( + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestJobsInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-jobs-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("driver ready", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver ready", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go deleted file mode 100644 index 54015f03..00000000 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ /dev/null @@ -1,290 +0,0 @@ -package jobs - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestJobsInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-jobs-init.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("driver ready", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("driver ready", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - &amqp.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - stopCh <- struct{}{} - wg.Wait() -} - -func TestJobsNoAMQPGlobal(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-jobs-init-no-amqp-global.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - &amqp.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - _, err = cont.Serve() - require.Error(t, err) -} - -func TestJobsPauseResume(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-jobs-pause-resume-ephemeral.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - // TODO delete - mockLogger.EXPECT().Debug("request", "body:", gomock.Any(), "context:", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("response", "body:", gomock.Any(), "context:", gomock.Any()).AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("ephemeralPause", ephemeralPause) - t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) - t.Run("ephemeralResume", ephemeralResume) - t.Run("pushToEnabledPipe", pushToPipe("test-local")) - - time.Sleep(time.Second * 1) - - stopCh <- struct{}{} - wg.Wait() -} - -func ephemeralPause(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} - pipe.GetPipelines()[0] = "test-local" - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Pause", pipe, er) - assert.NoError(t, err) -} - -func ephemeralResume(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} - pipe.GetPipelines()[0] = "test-local" - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Resume", pipe, er) - assert.NoError(t, err) -} |