summaryrefslogtreecommitdiff
path: root/tests/plugins/websockets/websocket_plugin_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'tests/plugins/websockets/websocket_plugin_test.go')
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go88
1 files changed, 14 insertions, 74 deletions
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index b2c756bf..4e4c09f1 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -13,16 +13,13 @@ import (
"time"
"github.com/fasthttp/websocket"
- flatbuffers "github.com/google/flatbuffers/go"
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/redis"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -170,7 +167,6 @@ func TestWSRedisAndMemory(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
- &memory.Plugin{},
)
assert.NoError(t, err)
@@ -220,9 +216,9 @@ func TestWSRedisAndMemory(t *testing.T) {
}()
time.Sleep(time.Second * 1)
+ t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
t.Run("RPCWsMemory", RPCWsMemory)
t.Run("RPCWsRedis", RPCWsRedis)
- t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
stopCh <- struct{}{}
@@ -464,7 +460,6 @@ func TestWSMemoryDeny(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
- &memory.Plugin{},
)
assert.NoError(t, err)
@@ -592,7 +587,6 @@ func TestWSMemoryStop(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
- &memory.Plugin{},
)
assert.NoError(t, err)
@@ -685,7 +679,6 @@ func TestWSMemoryOk(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
- &memory.Plugin{},
)
assert.NoError(t, err)
@@ -877,8 +870,8 @@ func publish2(command string, broker string, topics ...string) {
panic(err)
}
}
-func messageWS(command string, broker string, payload []byte, topics ...string) *Msg {
- return &Msg{
+func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message {
+ return &websocketsv1.Message{
Topics: topics,
Command: command,
Broker: broker,
@@ -886,70 +879,17 @@ func messageWS(command string, broker string, payload []byte, topics ...string)
}
}
-func makeMessage(command string, broker string, payload []byte, topics ...string) []byte {
- m := []pubsub.Message{
- {
- Topics: topics,
- Command: command,
- Broker: broker,
- Payload: payload,
+func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Messages {
+ m := &websocketsv1.Messages{
+ Messages: []*websocketsv1.Message{
+ {
+ Topics: topics,
+ Command: command,
+ Broker: broker,
+ Payload: payload,
+ },
},
}
- b := flatbuffers.NewBuilder(1)
-
- return msgs(b, m)
-}
-
-func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT {
- cmdOff := b.CreateString(msg.Command)
- brokerOff := b.CreateString(msg.Broker)
-
- offsets := make([]flatbuffers.UOffsetT, len(msg.Topics))
- for j := len(msg.Topics) - 1; j >= 0; j-- {
- offsets[j] = b.CreateString(msg.Topics[j])
- }
-
- message.MessageStartTopicsVector(b, len(offsets))
-
- for j := len(offsets) - 1; j >= 0; j-- {
- b.PrependUOffsetT(offsets[j])
- }
-
- tOff := b.EndVector(len(offsets))
- pOff := b.CreateByteVector(msg.Payload)
-
- message.MessageStart(b)
-
- message.MessageAddCommand(b, cmdOff)
- message.MessageAddBroker(b, brokerOff)
- message.MessageAddTopics(b, tOff)
- message.MessageAddPayload(b, pOff)
-
- return message.MessageEnd(b)
-}
-
-func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte {
- b.Reset()
-
- mOff := make([]flatbuffers.UOffsetT, len(msgs))
-
- for i := len(msgs) - 1; i >= 0; i-- {
- mOff[i] = serializeMsg(b, msgs[i])
- }
-
- message.MessagesStartMessagesVector(b, len(mOff))
-
- for i := len(mOff) - 1; i >= 0; i-- {
- b.PrependUOffsetT(mOff[i])
- }
-
- msgsOff := b.EndVector(len(msgs))
-
- message.MessagesStart(b)
- message.MessagesAddMessages(b, msgsOff)
- fOff := message.MessagesEnd(b)
- b.Finish(fOff)
-
- return b.Bytes[b.Head():]
+ return m
}