diff options
author | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
commit | 2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch) | |
tree | d796a11941fab4be668843a3fcbd83ea0859db39 /tests | |
parent | e855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff) |
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/jobs_err.php | 52 | ||||
-rw-r--r-- | tests/jobs_ok.php | 32 | ||||
-rw-r--r-- | tests/plugins/jobs/amqp/.rr-amqp-declare.yaml | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/amqp/.rr-amqp-init.yaml | 4 | ||||
-rw-r--r-- | tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml | 24 | ||||
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml | 27 | ||||
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-no-global.yaml | 31 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_amqp_test.go | 115 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_beanstalk_test.go | 160 |
11 files changed, 425 insertions, 26 deletions
diff --git a/tests/jobs_err.php b/tests/jobs_err.php new file mode 100644 index 00000000..4ccea4f8 --- /dev/null +++ b/tests/jobs_err.php @@ -0,0 +1,52 @@ +<?php + +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; +use Spiral\Goridge\StreamRelay; + +require __DIR__ . "/vendor/autoload.php"; + +$rr = new RoadRunner\Worker(new StreamRelay(\STDIN, \STDOUT)); + +while ($in = $rr->waitPayload()) { + try { + $ctx = json_decode($in->header, true); + $headers = $ctx['headers']; + + $set = isset($headers['attempts']); + + $val = 0; + + if ($set == true) { + $val = intval($headers['attempts'][0]); + $val++; + $headers['attempts'][0] = strval($val); + } else { + $headers['attempts'][0] = "1"; + }; + + if ($val > 3) { + $rr->respond(new RoadRunner\Payload(json_encode([ + // no error + 'type' => 0, + 'data' => [] + ]))); + } else { + $rr->respond(new RoadRunner\Payload(json_encode([ + 'type' => 1, + 'data' => [ + 'message' => 'error', + 'requeue' => true, + 'delay_seconds' => 5, + 'headers' => $headers + ] + ]))); + } + } catch (\Throwable $e) { + $rr->error((string)$e); + } +} diff --git a/tests/jobs_ok.php b/tests/jobs_ok.php new file mode 100644 index 00000000..fa58dd9a --- /dev/null +++ b/tests/jobs_ok.php @@ -0,0 +1,32 @@ +<?php + +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; +use Spiral\Goridge\StreamRelay; + +require __DIR__ . "/vendor/autoload.php"; + +$rr = new RoadRunner\Worker(new StreamRelay(\STDIN, \STDOUT)); + +while ($in = $rr->waitPayload()) { + try { + $ctx = json_decode($in->header, true); + $headers = $ctx['headers']; + + $rr->respond(new RoadRunner\Payload(json_encode([ + 'type' => 0, + 'data' => [ + 'message' => 'error', + 'requeue' => true, + 'delay_seconds' => 10, + 'headers' => $headers + ] + ]))); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +} diff --git a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml index 32883ce2..f9a7308b 100644 --- a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml +++ b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml @@ -2,7 +2,7 @@ rpc: listen: tcp://127.0.0.1:6001 server: - command: "php ../../client.php echo pipes" + command: "php ../../jobs_ok.php" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml index c06b5a79..43840545 100644 --- a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml +++ b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml @@ -2,7 +2,7 @@ rpc: listen: tcp://127.0.0.1:6001 server: - command: "php ../../client.php echo pipes" + command: "php ../../jobs_ok.php" relay: "pipes" relay_timeout: "20s" @@ -15,7 +15,7 @@ logs: mode: development jobs: - num_pollers: 10 + num_pollers: 1 pipeline_size: 100000 timeout: 1 pool: diff --git a/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml new file mode 100644 index 00000000..79493d96 --- /dev/null +++ b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:[email protected]:5672/ + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml index 022bf2f4..3555ef96 100644 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml @@ -2,7 +2,7 @@ rpc: listen: tcp://127.0.0.1:6001 server: - command: "php ../../client.php echo pipes" + command: "php ../../jobs_ok.php" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml index 8ded8cf1..cf9069a8 100644 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml @@ -2,7 +2,7 @@ rpc: listen: tcp://127.0.0.1:6001 server: - command: "php ../../client.php echo pipes" + command: "php ../../jobs_ok.php" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml new file mode 100644 index 00000000..a4f31290 --- /dev/null +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml @@ -0,0 +1,27 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +beanstalk: + # beanstalk address + addr: tcp://127.0.0.1:11300 + # connect timeout + timeout: 10s + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml new file mode 100644 index 00000000..87f46069 --- /dev/null +++ b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml @@ -0,0 +1,31 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: error + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-1: + driver: beanstalk + priority: 11 + tube_priority: 1 + tube: default-1 + reserve_timeout: 10s + + consume: [ "test-1" ] diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index cbbf43d8..bb5281c0 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -48,6 +48,9 @@ func TestAMQPInit(t *testing.T) { mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -59,8 +62,7 @@ func TestAMQPInit(t *testing.T) { cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -136,22 +138,116 @@ func TestAMQPDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareAMQPPipeline", declareAMQPPipe) + t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseAMQPPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestAMQPJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-amqp-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -209,6 +305,7 @@ func TestAMQPDeclare(t *testing.T) { t.Run("DeclareAMQPPipeline", declareAMQPPipe) t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) t.Run("PauseAMQPPipeline", pausePipelines("test-3")) t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index b36b4977..916ac08f 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -24,6 +24,7 @@ import ( jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestBeanstalkInit(t *testing.T) { @@ -47,19 +48,21 @@ func TestBeanstalkInit(t *testing.T) { mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -135,22 +138,123 @@ func TestBeanstalkDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) - mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe) + t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3")) + t.Run("PushBeanstalkPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseBeanstalkPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} +func TestBeanstalkJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-beanstalk-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() + + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - // mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -208,7 +312,9 @@ func TestBeanstalkDeclare(t *testing.T) { t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe) t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3")) t.Run("PushBeanstalkPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) t.Run("PauseBeanstalkPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) @@ -216,6 +322,36 @@ func TestBeanstalkDeclare(t *testing.T) { wg.Wait() } +func TestBeanstalkNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} + func declareBeanstalkPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) |