diff options
Diffstat (limited to 'tests/plugins/broadcast/broadcast_plugin_test.go')
-rw-r--r-- | tests/plugins/broadcast/broadcast_plugin_test.go | 36 |
1 files changed, 36 insertions, 0 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index a78b17e1..3dcc5c2c 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -1,6 +1,7 @@ package broadcast import ( + "context" "net" "net/rpc" "os" @@ -10,6 +11,7 @@ import ( "testing" "time" + goRedis "github.com/go-redis/redis/v8" "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" @@ -176,6 +178,9 @@ func TestBroadcastNoConfig(t *testing.T) { } func TestBroadcastSameSubscriber(t *testing.T) { + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) @@ -274,8 +279,11 @@ func TestBroadcastSameSubscriber(t *testing.T) { time.Sleep(time.Second * 2) t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002")) + time.Sleep(time.Second) t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002")) + time.Sleep(time.Second) t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) + time.Sleep(time.Second) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) time.Sleep(time.Second * 5) @@ -283,9 +291,17 @@ func TestBroadcastSameSubscriber(t *testing.T) { stopCh <- struct{}{} wg.Wait() + + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + + time.Sleep(time.Second * 5) } func TestBroadcastSameSubscriberGlobal(t *testing.T) { + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) @@ -384,8 +400,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { time.Sleep(time.Second * 2) t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003")) + time.Sleep(time.Second) t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003")) + time.Sleep(time.Second) t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) + time.Sleep(time.Second) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) time.Sleep(time.Second * 4) @@ -393,7 +412,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { stopCh <- struct{}{} wg.Wait() + time.Sleep(time.Second * 5) + + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) } func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) { @@ -446,6 +469,7 @@ func BroadcastPublishFoo3(port string) func(t *testing.T) { } } } + func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) { return func(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:"+port) @@ -475,3 +499,15 @@ func makeMessage(payload []byte, topics ...string) *websocketsv1.Request { return m } + +func redisFlushAll(addr string) func(t *testing.T) { + return func(t *testing.T) { + rdb := goRedis.NewClient(&goRedis.Options{ + Addr: addr, + Password: "", // no password set + DB: 0, // use default DB + }) + + rdb.FlushAll(context.Background()) + } +} |