diff options
author | Valery Piashchynski <[email protected]> | 2021-08-18 17:26:42 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-08-18 17:26:42 +0300 |
commit | 324407b3e2d779143be65872993c4d091abb1d38 (patch) | |
tree | e6f0bd64241ab2d4dc05809128c8e8d7d74cbcc4 /tests/plugins | |
parent | a5435be8ab58bd23f1c2d3afd4484dd1d86b6002 (diff) | |
parent | eb70b89cb2f23ccd44b91bbcac7438a05a40c801 (diff) |
#764: feat(stat): `job` plugin drivers statistic
#764: feat(stat): `job` plugin drivers statistic
Diffstat (limited to 'tests/plugins')
23 files changed, 968 insertions, 165 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index a78b17e1..3dcc5c2c 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -1,6 +1,7 @@ package broadcast import ( + "context" "net" "net/rpc" "os" @@ -10,6 +11,7 @@ import ( "testing" "time" + goRedis "github.com/go-redis/redis/v8" "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" @@ -176,6 +178,9 @@ func TestBroadcastNoConfig(t *testing.T) { } func TestBroadcastSameSubscriber(t *testing.T) { + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) @@ -274,8 +279,11 @@ func TestBroadcastSameSubscriber(t *testing.T) { time.Sleep(time.Second * 2) t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002")) + time.Sleep(time.Second) t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002")) + time.Sleep(time.Second) t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) + time.Sleep(time.Second) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) time.Sleep(time.Second * 5) @@ -283,9 +291,17 @@ func TestBroadcastSameSubscriber(t *testing.T) { stopCh <- struct{}{} wg.Wait() + + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + + time.Sleep(time.Second * 5) } func TestBroadcastSameSubscriberGlobal(t *testing.T) { + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) @@ -384,8 +400,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { time.Sleep(time.Second * 2) t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003")) + time.Sleep(time.Second) t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003")) + time.Sleep(time.Second) t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) + time.Sleep(time.Second) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) time.Sleep(time.Second * 4) @@ -393,7 +412,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { stopCh <- struct{}{} wg.Wait() + time.Sleep(time.Second * 5) + + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) } func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) { @@ -446,6 +469,7 @@ func BroadcastPublishFoo3(port string) func(t *testing.T) { } } } + func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) { return func(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:"+port) @@ -475,3 +499,15 @@ func makeMessage(payload []byte, topics ...string) *websocketsv1.Request { return m } + +func redisFlushAll(addr string) func(t *testing.T) { + return func(t *testing.T) { + rdb := goRedis.NewClient(&goRedis.Options{ + Addr: addr, + Password: "", // no password set + DB: 0, // use default DB + }) + + rdb.FlushAll(context.Background()) + } +} diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go index 01ad1479..ed5139a8 100644 --- a/tests/plugins/broadcast/plugins/plugin1.go +++ b/tests/plugins/broadcast/plugins/plugin1.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,14 +16,14 @@ type Plugin1 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +44,16 @@ func (p *Plugin1) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } - - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + errCh <- err + return } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) } }() @@ -68,8 +64,7 @@ func (p *Plugin1) Stop() error { _ = p.driver.Unsubscribe("1", "foo") _ = p.driver.Unsubscribe("1", "foo2") _ = p.driver.Unsubscribe("1", "foo3") - - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go index ee072ffe..20cc1b24 100644 --- a/tests/plugins/broadcast/plugins/plugin2.go +++ b/tests/plugins/broadcast/plugins/plugin2.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,13 +16,14 @@ type Plugin2 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -40,22 +43,20 @@ func (p *Plugin2) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) } }() @@ -64,7 +65,7 @@ func (p *Plugin2) Serve() chan error { func (p *Plugin2) Stop() error { _ = p.driver.Unsubscribe("2", "foo") - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go index 288201d1..2f416d2e 100644 --- a/tests/plugins/broadcast/plugins/plugin3.go +++ b/tests/plugins/broadcast/plugins/plugin3.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin3 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin3) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) } }() @@ -66,7 +65,7 @@ func (p *Plugin3) Serve() chan error { func (p *Plugin3) Stop() error { _ = p.driver.Unsubscribe("3", "foo") - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go index 56f79c0f..e2209648 100644 --- a/tests/plugins/broadcast/plugins/plugin4.go +++ b/tests/plugins/broadcast/plugins/plugin4.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin4 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin4) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) } }() @@ -66,8 +65,7 @@ func (p *Plugin4) Serve() chan error { func (p *Plugin4) Stop() error { _ = p.driver.Unsubscribe("4", "foo") - - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go index e7cd7e60..122046b8 100644 --- a/tests/plugins/broadcast/plugins/plugin5.go +++ b/tests/plugins/broadcast/plugins/plugin5.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin5 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin5) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) } }() @@ -66,8 +65,7 @@ func (p *Plugin5) Serve() chan error { func (p *Plugin5) Stop() error { _ = p.driver.Unsubscribe("5", "foo") - - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go index 08272196..6ace0a79 100644 --- a/tests/plugins/broadcast/plugins/plugin6.go +++ b/tests/plugins/broadcast/plugins/plugin6.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin6 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin6) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) } }() @@ -66,8 +65,7 @@ func (p *Plugin6) Serve() chan error { func (p *Plugin6) Stop() error { _ = p.driver.Unsubscribe("6", "foo") - - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index bd804264..a48c8972 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -22,7 +22,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/gzip" "github.com/spiral/roadrunner/v2/plugins/informer" diff --git a/tests/plugins/informer/informer_test.go b/tests/plugins/informer/informer_test.go index 61be85a1..c3b5c6a6 100644 --- a/tests/plugins/informer/informer_test.go +++ b/tests/plugins/informer/informer_test.go @@ -12,7 +12,7 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/logger" diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go index 095140b8..21897f40 100644 --- a/tests/plugins/informer/test_plugin.go +++ b/tests/plugins/informer/test_plugin.go @@ -5,7 +5,7 @@ import ( "time" "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/server" ) diff --git a/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml b/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml new file mode 100644 index 00000000..4db9a676 --- /dev/null +++ b/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml @@ -0,0 +1,27 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +metrics: + address: 127.0.0.1:2112 + +logs: + level: info + encoding: console + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 1 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index a268ebb8..5067ef9f 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -8,6 +8,7 @@ import ( "testing" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,6 +19,7 @@ const ( pause string = "jobs.Pause" destroy string = "jobs.Destroy" resume string = "jobs.Resume" + stat string = "jobs.Stat" ) func resumePipes(pipes ...string) func(t *testing.T) { @@ -57,7 +59,7 @@ func pushToDisabledPipe(pipeline string) func(t *testing.T) { er := &jobsv1beta.Empty{} err = client.Call(push, req, er) - assert.Error(t, err) + assert.NoError(t, err) } } @@ -85,6 +87,30 @@ func pushToPipe(pipeline string) func(t *testing.T) { } } +func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + Delay: delay, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call(push, req, er) + assert.NoError(t, err) + } +} + func pushToPipeErr(pipeline string) func(t *testing.T) { return func(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") @@ -183,3 +209,26 @@ func deleteProxy(name string, t *testing.T) { _ = resp.Body.Close() } } + +func stats(state *jobState.State) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + st := &jobsv1beta.Stats{} + er := &jobsv1beta.Empty{} + + err = client.Call(stat, er, st) + require.NoError(t, err) + require.NotNil(t, st) + + state.Queue = st.Stats[0].Queue + state.Pipeline = st.Stats[0].Pipeline + state.Driver = st.Stats[0].Driver + state.Active = st.Stats[0].Active + state.Delayed = st.Stats[0].Delayed + state.Reserved = st.Stats[0].Reserved + state.Ready = st.Stats[0].Ready + } +} diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 7096a467..df84fabc 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -133,6 +134,10 @@ func TestAMQPDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -228,6 +233,10 @@ func TestAMQPJobsError(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) @@ -305,44 +314,67 @@ func TestAMQPJobsError(t *testing.T) { wg.Wait() } -func declareAMQPPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") +func TestAMQPNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "amqp", - "name": "test-3", - "routing-key": "test-3", - "queue": "default", - "exchange-type": "direct", - "exchange": "amqp.default", - "prefetch": "100", - "priority": "3", - "exclusive": "true", - "multiple_ask": "true", - "requeue_on_fail": "true", - }} + cfg := &config.Viper{ + Path: "amqp/.rr-no-global.yaml", + Prefix: "rr", + } - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Declare", pipe, er) + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) } -func TestAMQPNoGlobalSection(t *testing.T) { +func TestAMQPStats(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "amqp/.rr-no-global.yaml", + Path: "amqp/.rr-amqp-declare.yaml", Prefix: "rr", } + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() + err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -355,6 +387,113 @@ func TestAMQPNoGlobalSection(t *testing.T) { t.Fatal(err) } - _, err = cont.Serve() - require.Error(t, err) + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareAMQPPipeline", declareAMQPPipe) + t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PauseAMQPPipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 2) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "amqp") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, false, out.Ready) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "amqp") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, true, out.Ready) + + time.Sleep(time.Second) + t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareAMQPPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "amqp", + "name": "test-3", + "routing-key": "test-3", + "queue": "default", + "exchange-type": "direct", + "exchange": "amqp.default", + "prefetch": "100", + "priority": "3", + "exclusive": "true", + "multiple_ask": "true", + "requeue_on_fail": "true", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) } diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index aebe9da1..31b80e35 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -134,6 +135,10 @@ func TestBeanstalkDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() @@ -235,6 +240,10 @@ func TestBeanstalkJobsError(t *testing.T) { mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() @@ -312,6 +321,138 @@ func TestBeanstalkJobsError(t *testing.T) { wg.Wait() } +func TestBeanstalkStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-beanstalk-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBeanstalkPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 3) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 10)) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 5) + + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "beanstalk") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 15) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "beanstalk") + assert.Equal(t, out.Queue, "default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second) + stopCh <- struct{}{} + wg.Wait() +} + func TestBeanstalkNoGlobalSection(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index 0a882556..162579c8 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -129,6 +130,12 @@ func TestEphemeralDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) err = cont.RegisterAll( @@ -201,8 +208,6 @@ func TestEphemeralDeclare(t *testing.T) { time.Sleep(time.Second * 5) stopCh <- struct{}{} wg.Wait() - - time.Sleep(time.Second * 5) } func TestEphemeralPauseResume(t *testing.T) { @@ -224,7 +229,13 @@ func TestEphemeralPauseResume(t *testing.T) { mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) + + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -290,14 +301,15 @@ func TestEphemeralPauseResume(t *testing.T) { time.Sleep(time.Second * 3) + t.Run("ephemeralResume", resumePipes("test-local")) t.Run("ephemeralPause", pausePipelines("test-local")) t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) t.Run("ephemeralResume", resumePipes("test-local")) t.Run("pushToEnabledPipe", pushToPipe("test-local")) - time.Sleep(time.Second * 1) stopCh <- struct{}{} + time.Sleep(time.Second) wg.Wait() } @@ -319,6 +331,12 @@ func TestEphemeralJobsError(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -394,6 +412,135 @@ func TestEphemeralJobsError(t *testing.T) { wg.Wait() } +func TestEphemeralStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + + t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + + time.Sleep(time.Second) + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Queue, "test-3") + + assert.Equal(t, out.Active, int64(1)) + assert.Equal(t, out.Delayed, int64(1)) + assert.Equal(t, out.Reserved, int64(0)) + + time.Sleep(time.Second) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Queue, "test-3") + + assert.Equal(t, out.Active, int64(0)) + assert.Equal(t, out.Delayed, int64(0)) + assert.Equal(t, out.Reserved, int64(0)) + + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + func declareEphemeralPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 22b80157..5d8d8d9c 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -1,6 +1,8 @@ package jobs import ( + "io/ioutil" + "net/http" "os" "os/signal" "sync" @@ -15,6 +17,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -119,3 +122,125 @@ func TestJobsInit(t *testing.T) { stopCh <- struct{}{} wg.Wait() } + +func TestJOBSMetrics(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{} + cfg.Prefix = "rr" + cfg.Path = "configs/.rr-jobs-metrics.yaml" + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &server.Plugin{}, + &jobs.Plugin{}, + &metrics.Plugin{}, + &ephemeral.Plugin{}, + mockLogger, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Minute * 3) + + go func() { + defer tt.Stop() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 2) + + t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) + time.Sleep(time.Second) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 5) + + genericOut, err := get() + assert.NoError(t, err) + + assert.Contains(t, genericOut, `rr_jobs_jobs_err 0`) + assert.Contains(t, genericOut, `rr_jobs_jobs_ok 3`) + assert.Contains(t, genericOut, `rr_jobs_push_err 0`) + assert.Contains(t, genericOut, `rr_jobs_push_ok 3`) + assert.Contains(t, genericOut, "workers_memory_bytes") + + close(sig) + time.Sleep(time.Second * 2) +} + +const getAddr = "http://127.0.0.1:2112/metrics" + +// get request and return body +func get() (string, error) { + r, err := http.Get(getAddr) + if err != nil { + return "", err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", err + } + + err = r.Body.Close() + if err != nil { + return "", err + } + // unsafe + return string(b), err +} diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 80c5e8f4..839e1aaa 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" @@ -51,7 +52,8 @@ func TestSQSInit(t *testing.T) { mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("sqs listener stopped").Times(2) + mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() + mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() err = cont.RegisterAll( cfg, @@ -133,10 +135,15 @@ func TestSQSDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("sqs listener stopped").Times(1) + mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() + mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() err = cont.RegisterAll( cfg, @@ -231,9 +238,14 @@ func TestSQSJobsError(t *testing.T) { mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("sqs listener stopped").Times(1) + mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() + mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() err = cont.RegisterAll( cfg, @@ -339,6 +351,136 @@ func TestSQSNoGlobalSection(t *testing.T) { require.Error(t, err) } +func TestSQSStat(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "sqs/.rr-sqs-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() + mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareSQSPipeline", declareSQSPipe) + t.Run("ConsumeSQSPipeline", resumePipes("test-3")) + t.Run("PushSQSPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseSQSPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + + t.Run("PushSQSPipelineDelayed", pushToPipeDelayed("test-3", 5)) + t.Run("PushSQSPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "sqs") + assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default") + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "sqs") + assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default") + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + func declareSQSPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go index 627b32f0..f50d34cc 100644 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -56,6 +56,12 @@ func TestDurabilityAMQP(t *testing.T) { mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Error("job push error, job might be lost", "error", gomock.Any(), "pipeline", "test-1", "ID", gomock.Any(), "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Error("job push error, job might be lost", "error", gomock.Any(), "pipeline", "test-2", "ID", gomock.Any(), "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(4) // redial errors @@ -176,13 +182,16 @@ func TestDurabilitySQS(t *testing.T) { mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() - mockLogger.EXPECT().Warn("sqs listener stopped").MinTimes(2) - + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) // redial errors mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Info("queues and subscribers redeclared successfully").AnyTimes() + + // stop + mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() + mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() err = cont.RegisterAll( cfg, @@ -246,8 +255,9 @@ func TestDurabilitySQS(t *testing.T) { time.Sleep(time.Second * 3) go func() { - time.Sleep(time.Second * 2) + time.Sleep(time.Second) t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) + time.Sleep(time.Second) t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) }() @@ -291,9 +301,12 @@ func TestDurabilityBeanstalk(t *testing.T) { mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) // redial errors mockLogger.EXPECT().Info("beanstalk redial was successful").MinTimes(2) mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() diff --git a/tests/plugins/metrics/docker-compose.yml b/tests/plugins/metrics/docker-compose.yml deleted file mode 100644 index 610633b4..00000000 --- a/tests/plugins/metrics/docker-compose.yml +++ /dev/null @@ -1,7 +0,0 @@ -version: '3.7' - -services: - prometheus: - image: prom/prometheus - ports: - - 9090:9090 diff --git a/tests/plugins/reload/configs/.rr-reload-2.yaml b/tests/plugins/reload/configs/.rr-reload-2.yaml index fd1fe417..6a9d7582 100644 --- a/tests/plugins/reload/configs/.rr-reload-2.yaml +++ b/tests/plugins/reload/configs/.rr-reload-2.yaml @@ -27,7 +27,7 @@ logs: mode: development level: debug reload: - interval: 2s + interval: 1s patterns: - .txt services: diff --git a/tests/plugins/reload/reload_plugin_test.go b/tests/plugins/reload/reload_plugin_test.go index b5223b9c..21c27e49 100644 --- a/tests/plugins/reload/reload_plugin_test.go +++ b/tests/plugins/reload/reload_plugin_test.go @@ -50,11 +50,11 @@ func TestReloadInit(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").Times(1) - mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").Times(1) - mockLogger.EXPECT().Info("HTTP handler listeners successfully re-added").Times(1) - mockLogger.EXPECT().Info("HTTP plugin successfully restarted").Times(1) + mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").MinTimes(1) + mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").MinTimes(1) + mockLogger.EXPECT().Info("HTTP handler listeners successfully re-added").MinTimes(1) + mockLogger.EXPECT().Info("HTTP plugin successfully restarted").MinTimes(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() // placeholder for the workerlogerror err = cont.RegisterAll( @@ -258,10 +258,10 @@ func TestReloadFilterFileExt(t *testing.T) { mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(100) mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Debug("file added to the list of removed files", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").Times(1) - mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").Times(1) + mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").MinTimes(1) + mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").MinTimes(1) mockLogger.EXPECT().Info("HTTP handler listeners successfully re-added").MinTimes(1) - mockLogger.EXPECT().Info("HTTP plugin successfully restarted").Times(1) + mockLogger.EXPECT().Info("HTTP plugin successfully restarted").MinTimes(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() // placeholder for the workerlogerror err = cont.RegisterAll( @@ -317,6 +317,7 @@ func TestReloadFilterFileExt(t *testing.T) { time.Sleep(time.Second * 3) t.Run("ReloadMakeFiles", reloadMakeFiles) + time.Sleep(time.Second * 2) t.Run("ReloadFilteredExt", reloadFilteredExt) time.Sleep(time.Second * 10) @@ -460,10 +461,15 @@ func TestReloadCopy100(t *testing.T) { time.Sleep(time.Second * 3) t.Run("ReloadMake100Files", reloadMake100Files) + time.Sleep(time.Second * 2) t.Run("ReloadCopyFiles", reloadCopyFiles) + time.Sleep(time.Second * 2) t.Run("ReloadRecursiveDirsSupport", copyFilesRecursive) + time.Sleep(time.Second * 2) t.Run("RandomChangesInRecursiveDirs", randomChangesInRecursiveDirs) + time.Sleep(time.Second * 2) t.Run("RemoveFilesSupport", removeFilesSupport) + time.Sleep(time.Second * 2) t.Run("ReloadMoveSupport", reloadMoveSupport) time.Sleep(time.Second * 10) @@ -733,7 +739,7 @@ func TestReloadNoRecursion(t *testing.T) { time.Sleep(time.Second * 3) t.Run("ReloadMakeFiles", reloadMakeFiles) // make files in the testDir - time.Sleep(time.Second * 3) + time.Sleep(time.Second * 2) t.Run("ReloadCopyFilesRecursive", reloadCopyFiles) time.Sleep(time.Second * 3) assert.NoError(t, freeResources(testDir)) diff --git a/tests/plugins/resetter/resetter_test.go b/tests/plugins/resetter/resetter_test.go index 465d22dd..10f38a9c 100644 --- a/tests/plugins/resetter/resetter_test.go +++ b/tests/plugins/resetter/resetter_test.go @@ -41,11 +41,6 @@ func TestResetterInit(t *testing.T) { mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("started List method").MinTimes(1) - mockLogger.EXPECT().Debug("services list", "services", []string{"resetter.plugin1"}).MinTimes(1) - mockLogger.EXPECT().Debug("finished List method").MinTimes(1) - mockLogger.EXPECT().Debug("started Reset method for the service", "service", "resetter.plugin1").MinTimes(1) - mockLogger.EXPECT().Debug("finished Reset method for the service", "service", "resetter.plugin1").MinTimes(1) mockLogger.EXPECT().Warn("listener accept error, connection closed", "error", gomock.Any()).AnyTimes() err = cont.RegisterAll( diff --git a/tests/plugins/service/service_plugin_test.go b/tests/plugins/service/service_plugin_test.go index 8948a458..ddf54520 100644 --- a/tests/plugins/service/service_plugin_test.go +++ b/tests/plugins/service/service_plugin_test.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux package service |