summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
committerValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
commit2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch)
treed796a11941fab4be668843a3fcbd83ea0859db39 /tests
parente855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff)
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests')
-rw-r--r--tests/jobs_err.php52
-rw-r--r--tests/jobs_ok.php32
-rw-r--r--tests/plugins/jobs/amqp/.rr-amqp-declare.yaml2
-rw-r--r--tests/plugins/jobs/amqp/.rr-amqp-init.yaml4
-rw-r--r--tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml24
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml2
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml2
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml27
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-no-global.yaml31
-rw-r--r--tests/plugins/jobs/jobs_amqp_test.go115
-rw-r--r--tests/plugins/jobs/jobs_beanstalk_test.go160
11 files changed, 425 insertions, 26 deletions
diff --git a/tests/jobs_err.php b/tests/jobs_err.php
new file mode 100644
index 00000000..4ccea4f8
--- /dev/null
+++ b/tests/jobs_err.php
@@ -0,0 +1,52 @@
+<?php
+
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+use Spiral\Goridge\StreamRelay;
+
+require __DIR__ . "/vendor/autoload.php";
+
+$rr = new RoadRunner\Worker(new StreamRelay(\STDIN, \STDOUT));
+
+while ($in = $rr->waitPayload()) {
+ try {
+ $ctx = json_decode($in->header, true);
+ $headers = $ctx['headers'];
+
+ $set = isset($headers['attempts']);
+
+ $val = 0;
+
+ if ($set == true) {
+ $val = intval($headers['attempts'][0]);
+ $val++;
+ $headers['attempts'][0] = strval($val);
+ } else {
+ $headers['attempts'][0] = "1";
+ };
+
+ if ($val > 3) {
+ $rr->respond(new RoadRunner\Payload(json_encode([
+ // no error
+ 'type' => 0,
+ 'data' => []
+ ])));
+ } else {
+ $rr->respond(new RoadRunner\Payload(json_encode([
+ 'type' => 1,
+ 'data' => [
+ 'message' => 'error',
+ 'requeue' => true,
+ 'delay_seconds' => 5,
+ 'headers' => $headers
+ ]
+ ])));
+ }
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+}
diff --git a/tests/jobs_ok.php b/tests/jobs_ok.php
new file mode 100644
index 00000000..fa58dd9a
--- /dev/null
+++ b/tests/jobs_ok.php
@@ -0,0 +1,32 @@
+<?php
+
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+use Spiral\Goridge\StreamRelay;
+
+require __DIR__ . "/vendor/autoload.php";
+
+$rr = new RoadRunner\Worker(new StreamRelay(\STDIN, \STDOUT));
+
+while ($in = $rr->waitPayload()) {
+ try {
+ $ctx = json_decode($in->header, true);
+ $headers = $ctx['headers'];
+
+ $rr->respond(new RoadRunner\Payload(json_encode([
+ 'type' => 0,
+ 'data' => [
+ 'message' => 'error',
+ 'requeue' => true,
+ 'delay_seconds' => 10,
+ 'headers' => $headers
+ ]
+ ])));
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+}
diff --git a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml
index 32883ce2..f9a7308b 100644
--- a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml
+++ b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml
@@ -2,7 +2,7 @@ rpc:
listen: tcp://127.0.0.1:6001
server:
- command: "php ../../client.php echo pipes"
+ command: "php ../../jobs_ok.php"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml
index c06b5a79..43840545 100644
--- a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml
+++ b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml
@@ -2,7 +2,7 @@ rpc:
listen: tcp://127.0.0.1:6001
server:
- command: "php ../../client.php echo pipes"
+ command: "php ../../jobs_ok.php"
relay: "pipes"
relay_timeout: "20s"
@@ -15,7 +15,7 @@ logs:
mode: development
jobs:
- num_pollers: 10
+ num_pollers: 1
pipeline_size: 100000
timeout: 1
pool:
diff --git a/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml
new file mode 100644
index 00000000..79493d96
--- /dev/null
+++ b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml
@@ -0,0 +1,24 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../jobs_err.php"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:[email protected]:5672/
+
+logs:
+ level: debug
+ encoding: console
+ mode: development
+
+jobs:
+ num_pollers: 1
+ pipeline_size: 100000
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml
index 022bf2f4..3555ef96 100644
--- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml
+++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml
@@ -2,7 +2,7 @@ rpc:
listen: tcp://127.0.0.1:6001
server:
- command: "php ../../client.php echo pipes"
+ command: "php ../../jobs_ok.php"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml
index 8ded8cf1..cf9069a8 100644
--- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml
+++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml
@@ -2,7 +2,7 @@ rpc:
listen: tcp://127.0.0.1:6001
server:
- command: "php ../../client.php echo pipes"
+ command: "php ../../jobs_ok.php"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml
new file mode 100644
index 00000000..a4f31290
--- /dev/null
+++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml
@@ -0,0 +1,27 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../jobs_err.php"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+beanstalk:
+ # beanstalk address
+ addr: tcp://127.0.0.1:11300
+ # connect timeout
+ timeout: 10s
+
+logs:
+ level: debug
+ encoding: console
+ mode: development
+
+jobs:
+ num_pollers: 10
+ pipeline_size: 100000
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml
new file mode 100644
index 00000000..87f46069
--- /dev/null
+++ b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml
@@ -0,0 +1,31 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../jobs_ok.php"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+logs:
+ level: error
+ mode: development
+
+jobs:
+ num_pollers: 10
+ pipeline_size: 100000
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-1:
+ driver: beanstalk
+ priority: 11
+ tube_priority: 1
+ tube: default-1
+ reserve_timeout: 10s
+
+ consume: [ "test-1" ]
diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go
index cbbf43d8..bb5281c0 100644
--- a/tests/plugins/jobs/jobs_amqp_test.go
+++ b/tests/plugins/jobs/jobs_amqp_test.go
@@ -48,6 +48,9 @@ func TestAMQPInit(t *testing.T) {
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -59,8 +62,7 @@ func TestAMQPInit(t *testing.T) {
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- // mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -136,22 +138,116 @@ func TestAMQPDeclare(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
- mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &amqp.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareAMQPPipeline", declareAMQPPipe)
+ t.Run("ConsumeAMQPPipeline", resumePipes("test-3"))
+ t.Run("PushAMQPPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+ t.Run("PauseAMQPPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second)
+ t.Run("DestroyAMQPPipeline", destroyPipelines("test-3"))
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestAMQPJobsError(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "amqp/.rr-amqp-jobs-err.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- // mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -209,6 +305,7 @@ func TestAMQPDeclare(t *testing.T) {
t.Run("DeclareAMQPPipeline", declareAMQPPipe)
t.Run("ConsumeAMQPPipeline", resumePipes("test-3"))
t.Run("PushAMQPPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 25)
t.Run("PauseAMQPPipeline", pausePipelines("test-3"))
t.Run("DestroyAMQPPipeline", destroyPipelines("test-3"))
diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go
index b36b4977..916ac08f 100644
--- a/tests/plugins/jobs/jobs_beanstalk_test.go
+++ b/tests/plugins/jobs/jobs_beanstalk_test.go
@@ -24,6 +24,7 @@ import (
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
"github.com/spiral/roadrunner/v2/tests/mocks"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestBeanstalkInit(t *testing.T) {
@@ -47,19 +48,21 @@ func TestBeanstalkInit(t *testing.T) {
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes()
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-
- mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
+ mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes()
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- // mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -135,22 +138,123 @@ func TestBeanstalkDeclare(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes()
+ mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
- mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes()
- mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &beanstalk.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe)
+ t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3"))
+ t.Run("PushBeanstalkPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+ t.Run("PauseBeanstalkPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second)
+ t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+func TestBeanstalkJobsError(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "beanstalk/.rr-beanstalk-jobs-err.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes()
mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes()
+
+ mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
+
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- // mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -208,7 +312,9 @@ func TestBeanstalkDeclare(t *testing.T) {
t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe)
t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3"))
t.Run("PushBeanstalkPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 25)
t.Run("PauseBeanstalkPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second)
t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3"))
time.Sleep(time.Second * 5)
@@ -216,6 +322,36 @@ func TestBeanstalkDeclare(t *testing.T) {
wg.Wait()
}
+func TestBeanstalkNoGlobalSection(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "beanstalk/.rr-no-global.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &beanstalk.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = cont.Serve()
+ require.Error(t, err)
+}
+
func declareBeanstalkPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)