summaryrefslogtreecommitdiff
path: root/tests/plugins/broadcast/broadcast_plugin_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'tests/plugins/broadcast/broadcast_plugin_test.go')
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go36
1 files changed, 36 insertions, 0 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index a78b17e1..3dcc5c2c 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -1,6 +1,7 @@
package broadcast
import (
+ "context"
"net"
"net/rpc"
"os"
@@ -10,6 +11,7 @@ import (
"testing"
"time"
+ goRedis "github.com/go-redis/redis/v8"
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
@@ -176,6 +178,9 @@ func TestBroadcastNoConfig(t *testing.T) {
}
func TestBroadcastSameSubscriber(t *testing.T) {
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
@@ -274,8 +279,11 @@ func TestBroadcastSameSubscriber(t *testing.T) {
time.Sleep(time.Second * 2)
t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002"))
+ time.Sleep(time.Second)
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002"))
time.Sleep(time.Second * 5)
@@ -283,9 +291,17 @@ func TestBroadcastSameSubscriber(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()
+
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
+ time.Sleep(time.Second * 5)
}
func TestBroadcastSameSubscriberGlobal(t *testing.T) {
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
@@ -384,8 +400,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
time.Sleep(time.Second * 2)
t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003"))
+ time.Sleep(time.Second)
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003"))
time.Sleep(time.Second * 4)
@@ -393,7 +412,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()
+
time.Sleep(time.Second * 5)
+
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
}
func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) {
@@ -446,6 +469,7 @@ func BroadcastPublishFoo3(port string) func(t *testing.T) {
}
}
}
+
func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) {
return func(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:"+port)
@@ -475,3 +499,15 @@ func makeMessage(payload []byte, topics ...string) *websocketsv1.Request {
return m
}
+
+func redisFlushAll(addr string) func(t *testing.T) {
+ return func(t *testing.T) {
+ rdb := goRedis.NewClient(&goRedis.Options{
+ Addr: addr,
+ Password: "", // no password set
+ DB: 0, // use default DB
+ })
+
+ rdb.FlushAll(context.Background())
+ }
+}