summaryrefslogtreecommitdiff
path: root/tests/plugins/websockets
diff options
context:
space:
mode:
Diffstat (limited to 'tests/plugins/websockets')
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go50
1 files changed, 50 insertions, 0 deletions
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 772b53ac..f5289752 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -13,9 +13,12 @@ import (
"time"
"github.com/fasthttp/websocket"
+ flatbuffers "github.com/google/flatbuffers/go"
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ message2 "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -883,3 +886,50 @@ func message(command string, broker string, payload []byte, topics ...string) *M
Payload: payload,
}
}
+
+func makePayload(b *flatbuffers.Builder, storage string, items []pubsub.Message) []byte {
+ b.Reset()
+
+ storageOffset := b.CreateString(storage)
+
+ // //////////////////// ITEMS VECTOR ////////////////////////////
+ offset := make([]flatbuffers.UOffsetT, len(items))
+ for i := len(items) - 1; i >= 0; i-- {
+ offset[i] = serializeItems(b, items[i])
+ }
+
+ message2.MessageStartTopicsVector(b, len(offset))
+
+ for i := len(offset) - 1; i >= 0; i-- {
+ b.PrependUOffsetT(offset[i])
+ }
+
+ itemsOffset := b.EndVector(len(offset))
+ // /////////////////////////////////////////////////////////////////
+
+ message2.MessageStart(b)
+ message2.MessagesAddMessages(b, itemsOffset)
+ message2.PayloadAddStorage(b, storageOffset)
+
+ finalOffset := message2.PayloadEnd(b)
+
+ b.Finish(finalOffset)
+
+ return b.Bytes[b.Head():]
+}
+
+func serializeItems(b *flatbuffers.Builder, item pubsub.Message) flatbuffers.UOffsetT {
+ br := b.CreateString(item.Broker)
+ cmd := b.CreateString(item.Command)
+ payload := b.CreateByteVector(item.Payload)
+
+
+
+ message2.MessageStart(b)
+
+ message2.MessageAddBroker(b, br)
+ message2.MessageAddCommand(b, cmd)
+ message2.MessageAddPayload(b, payload)
+
+ return message2.MessageEnd(b)
+}