summaryrefslogtreecommitdiff
path: root/tests/plugins/websockets/websocket_plugin_test.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-02 17:25:09 +0300
committerValery Piashchynski <[email protected]>2021-06-02 17:25:09 +0300
commit12c031ce76c505128ebf9daafa91952855f202d4 (patch)
tree51846c0cd8a452246e383deb2ac00cce9ef1b92c /tests/plugins/websockets/websocket_plugin_test.go
parent352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (diff)
- Switch from the json to flatbuffers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests/plugins/websockets/websocket_plugin_test.go')
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go130
1 files changed, 75 insertions, 55 deletions
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index f5289752..b2c756bf 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -18,7 +18,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- message2 "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -32,7 +32,7 @@ import (
)
type Msg struct {
- // Topic message been pushed into.
+ // Topic makeMessage been pushed into.
Topics []string `json:"topic"`
// Command (join, leave, headers)
@@ -134,7 +134,7 @@ func wsInit(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -244,7 +244,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -261,14 +261,14 @@ func RPCWsMemoryPubAsync(t *testing.T) {
publishAsync("", "memory", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -291,7 +291,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
publishAsync2("", "memory", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -318,7 +318,7 @@ func RPCWsMemory(t *testing.T) {
}
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -335,14 +335,14 @@ func RPCWsMemory(t *testing.T) {
publish("", "memory", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -365,7 +365,7 @@ func RPCWsMemory(t *testing.T) {
publish2("", "memory", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -390,7 +390,7 @@ func RPCWsRedis(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(message("join", "redis", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -407,14 +407,14 @@ func RPCWsRedis(t *testing.T) {
publish("", "redis", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "redis", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -437,7 +437,7 @@ func RPCWsRedis(t *testing.T) {
publish2("", "redis", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -540,7 +540,7 @@ func RPCWsMemoryDeny(t *testing.T) {
}
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -556,7 +556,7 @@ func RPCWsMemoryDeny(t *testing.T) {
assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -761,7 +761,7 @@ func RPCWsMemoryAllow(t *testing.T) {
}
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -778,14 +778,14 @@ func RPCWsMemoryAllow(t *testing.T) {
publish("", "memory", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -808,7 +808,7 @@ func RPCWsMemoryAllow(t *testing.T) {
publish2("", "memory", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -827,7 +827,7 @@ func publish(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret)
+ err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret)
if err != nil {
panic(err)
}
@@ -842,7 +842,7 @@ func publishAsync(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret)
+ err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret)
if err != nil {
panic(err)
}
@@ -857,7 +857,7 @@ func publishAsync2(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret)
+ err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret)
if err != nil {
panic(err)
}
@@ -872,13 +872,12 @@ func publish2(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret)
+ err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret)
if err != nil {
panic(err)
}
}
-
-func message(command string, broker string, payload []byte, topics ...string) *Msg {
+func messageWS(command string, broker string, payload []byte, topics ...string) *Msg {
return &Msg{
Topics: topics,
Command: command,
@@ -887,49 +886,70 @@ func message(command string, broker string, payload []byte, topics ...string) *M
}
}
-func makePayload(b *flatbuffers.Builder, storage string, items []pubsub.Message) []byte {
- b.Reset()
+func makeMessage(command string, broker string, payload []byte, topics ...string) []byte {
+ m := []pubsub.Message{
+ {
+ Topics: topics,
+ Command: command,
+ Broker: broker,
+ Payload: payload,
+ },
+ }
- storageOffset := b.CreateString(storage)
+ b := flatbuffers.NewBuilder(1)
- // //////////////////// ITEMS VECTOR ////////////////////////////
- offset := make([]flatbuffers.UOffsetT, len(items))
- for i := len(items) - 1; i >= 0; i-- {
- offset[i] = serializeItems(b, items[i])
- }
+ return msgs(b, m)
+}
- message2.MessageStartTopicsVector(b, len(offset))
+func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT {
+ cmdOff := b.CreateString(msg.Command)
+ brokerOff := b.CreateString(msg.Broker)
- for i := len(offset) - 1; i >= 0; i-- {
- b.PrependUOffsetT(offset[i])
+ offsets := make([]flatbuffers.UOffsetT, len(msg.Topics))
+ for j := len(msg.Topics) - 1; j >= 0; j-- {
+ offsets[j] = b.CreateString(msg.Topics[j])
}
- itemsOffset := b.EndVector(len(offset))
- // /////////////////////////////////////////////////////////////////
+ message.MessageStartTopicsVector(b, len(offsets))
- message2.MessageStart(b)
- message2.MessagesAddMessages(b, itemsOffset)
- message2.PayloadAddStorage(b, storageOffset)
+ for j := len(offsets) - 1; j >= 0; j-- {
+ b.PrependUOffsetT(offsets[j])
+ }
- finalOffset := message2.PayloadEnd(b)
+ tOff := b.EndVector(len(offsets))
+ pOff := b.CreateByteVector(msg.Payload)
- b.Finish(finalOffset)
+ message.MessageStart(b)
- return b.Bytes[b.Head():]
+ message.MessageAddCommand(b, cmdOff)
+ message.MessageAddBroker(b, brokerOff)
+ message.MessageAddTopics(b, tOff)
+ message.MessageAddPayload(b, pOff)
+
+ return message.MessageEnd(b)
}
-func serializeItems(b *flatbuffers.Builder, item pubsub.Message) flatbuffers.UOffsetT {
- br := b.CreateString(item.Broker)
- cmd := b.CreateString(item.Command)
- payload := b.CreateByteVector(item.Payload)
+func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte {
+ b.Reset()
+ mOff := make([]flatbuffers.UOffsetT, len(msgs))
+ for i := len(msgs) - 1; i >= 0; i-- {
+ mOff[i] = serializeMsg(b, msgs[i])
+ }
- message2.MessageStart(b)
+ message.MessagesStartMessagesVector(b, len(mOff))
- message2.MessageAddBroker(b, br)
- message2.MessageAddCommand(b, cmd)
- message2.MessageAddPayload(b, payload)
+ for i := len(mOff) - 1; i >= 0; i-- {
+ b.PrependUOffsetT(mOff[i])
+ }
- return message2.MessageEnd(b)
+ msgsOff := b.EndVector(len(msgs))
+
+ message.MessagesStart(b)
+ message.MessagesAddMessages(b, msgsOff)
+ fOff := message.MessagesEnd(b)
+ b.Finish(fOff)
+
+ return b.Bytes[b.Head():]
}