diff options
author | Valery Piashchynski <[email protected]> | 2021-08-17 19:55:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-17 19:55:15 +0300 |
commit | ab690ab9c6ae2b00aef1b501e8b17ff02b5da753 (patch) | |
tree | 0a58b043605ef1d9b09e75b207c236aacb1ed55a /tests | |
parent | bd0da830ae345e1ed4a67782bf413673beeba037 (diff) |
Update to go 1.17
Add Stat with tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/plugins/broadcast/broadcast_plugin_test.go | 9 | ||||
-rw-r--r-- | tests/plugins/jobs/helpers.go | 48 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_amqp_test.go | 171 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_beanstalk_test.go | 128 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_ephemeral_test.go | 139 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_sqs_test.go | 128 |
6 files changed, 595 insertions, 28 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index c7041cc9..3dcc5c2c 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -279,8 +279,11 @@ func TestBroadcastSameSubscriber(t *testing.T) { time.Sleep(time.Second * 2) t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002")) + time.Sleep(time.Second) t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002")) + time.Sleep(time.Second) t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) + time.Sleep(time.Second) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) time.Sleep(time.Second * 5) @@ -291,6 +294,8 @@ func TestBroadcastSameSubscriber(t *testing.T) { t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + + time.Sleep(time.Second * 5) } func TestBroadcastSameSubscriberGlobal(t *testing.T) { @@ -395,8 +400,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { time.Sleep(time.Second * 2) t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003")) + time.Sleep(time.Second) t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003")) + time.Sleep(time.Second) t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) + time.Sleep(time.Second) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) time.Sleep(time.Second * 4) @@ -404,6 +412,7 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { stopCh <- struct{}{} wg.Wait() + time.Sleep(time.Second * 5) t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index a268ebb8..9c314494 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -8,6 +8,7 @@ import ( "testing" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,6 +19,7 @@ const ( pause string = "jobs.Pause" destroy string = "jobs.Destroy" resume string = "jobs.Resume" + stat string = "jobs.Stat" ) func resumePipes(pipes ...string) func(t *testing.T) { @@ -57,7 +59,7 @@ func pushToDisabledPipe(pipeline string) func(t *testing.T) { er := &jobsv1beta.Empty{} err = client.Call(push, req, er) - assert.Error(t, err) + assert.NoError(t, err) } } @@ -85,6 +87,30 @@ func pushToPipe(pipeline string) func(t *testing.T) { } } +func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + Delay: delay, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call(push, req, er) + assert.NoError(t, err) + } +} + func pushToPipeErr(pipeline string) func(t *testing.T) { return func(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") @@ -183,3 +209,23 @@ func deleteProxy(name string, t *testing.T) { _ = resp.Body.Close() } } + +func stats(t *testing.T, state *jobState.State) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + st := &jobsv1beta.Stats{} + er := &jobsv1beta.Empty{} + + err = client.Call(stat, er, st) + require.NoError(t, err) + require.NotNil(t, st) + + state.Queue = st.Stats[0].Queue + state.Pipeline = st.Stats[0].Pipeline + state.Driver = st.Stats[0].Driver + state.Active = st.Stats[0].Active + state.Delayed = st.Stats[0].Delayed + state.Reserved = st.Stats[0].Reserved +} diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 7096a467..4c1f600a 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -305,44 +306,64 @@ func TestAMQPJobsError(t *testing.T) { wg.Wait() } -func declareAMQPPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") +func TestAMQPNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "amqp", - "name": "test-3", - "routing-key": "test-3", - "queue": "default", - "exchange-type": "direct", - "exchange": "amqp.default", - "prefetch": "100", - "priority": "3", - "exclusive": "true", - "multiple_ask": "true", - "requeue_on_fail": "true", - }} + cfg := &config.Viper{ + Path: "amqp/.rr-no-global.yaml", + Prefix: "rr", + } - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Declare", pipe, er) + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) } -func TestAMQPNoGlobalSection(t *testing.T) { +func TestAMQPStats(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "amqp/.rr-no-global.yaml", + Path: "amqp/.rr-amqp-declare.yaml", Prefix: "rr", } + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() + err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -355,6 +376,110 @@ func TestAMQPNoGlobalSection(t *testing.T) { t.Fatal(err) } - _, err = cont.Serve() - require.Error(t, err) + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareAMQPPipeline", declareAMQPPipe) + t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PauseAMQPPipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 2) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + + out := &jobState.State{} + stats(t, out) + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "amqp") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "amqp") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareAMQPPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "amqp", + "name": "test-3", + "routing-key": "test-3", + "queue": "default", + "exchange-type": "direct", + "exchange": "amqp.default", + "prefetch": "100", + "priority": "3", + "exclusive": "true", + "multiple_ask": "true", + "requeue_on_fail": "true", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) } diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index aebe9da1..2f56acab 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -312,6 +313,133 @@ func TestBeanstalkJobsError(t *testing.T) { wg.Wait() } +func TestBeanstalkStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-beanstalk-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBeanstalkPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 3) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) + + out := &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "beanstalk") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(1), out.Reserved) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "beanstalk") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + func TestBeanstalkNoGlobalSection(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index 0a882556..3f296c83 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -129,6 +130,8 @@ func TestEphemeralDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) err = cont.RegisterAll( @@ -201,8 +204,6 @@ func TestEphemeralDeclare(t *testing.T) { time.Sleep(time.Second * 5) stopCh <- struct{}{} wg.Wait() - - time.Sleep(time.Second * 5) } func TestEphemeralPauseResume(t *testing.T) { @@ -224,7 +225,9 @@ func TestEphemeralPauseResume(t *testing.T) { mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) + + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -290,14 +293,15 @@ func TestEphemeralPauseResume(t *testing.T) { time.Sleep(time.Second * 3) + t.Run("ephemeralResume", resumePipes("test-local")) t.Run("ephemeralPause", pausePipelines("test-local")) t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) t.Run("ephemeralResume", resumePipes("test-local")) t.Run("pushToEnabledPipe", pushToPipe("test-local")) - time.Sleep(time.Second * 1) stopCh <- struct{}{} + time.Sleep(time.Second) wg.Wait() } @@ -319,6 +323,8 @@ func TestEphemeralJobsError(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -394,6 +400,131 @@ func TestEphemeralJobsError(t *testing.T) { wg.Wait() } +func TestEphemeralStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + + t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + + time.Sleep(time.Second) + out := &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Queue, "test-3") + + assert.Equal(t, out.Active, int64(1)) + assert.Equal(t, out.Delayed, int64(1)) + assert.Equal(t, out.Reserved, int64(0)) + + time.Sleep(time.Second) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Queue, "test-3") + + assert.Equal(t, out.Active, int64(0)) + assert.Equal(t, out.Delayed, int64(0)) + assert.Equal(t, out.Reserved, int64(0)) + + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + func declareEphemeralPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index cb1f4486..e5a92916 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -342,6 +343,133 @@ func TestSQSNoGlobalSection(t *testing.T) { require.Error(t, err) } +func TestSQSStat(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "sqs/.rr-sqs-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() + mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareSQSPipeline", declareSQSPipe) + t.Run("ConsumeSQSPipeline", resumePipes("test-3")) + t.Run("PushSQSPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseSQSPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + + t.Run("PushSQSPipelineDelayed", pushToPipeDelayed("test-3", 5)) + t.Run("PushSQSPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + + out := &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "sqs") + assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default") + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + stats(t, out) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "sqs") + assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + func declareSQSPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) |