From 12c031ce76c505128ebf9daafa91952855f202d4 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 2 Jun 2021 17:25:09 +0300 Subject: - Switch from the json to flatbuffers Signed-off-by: Valery Piashchynski --- tests/plugins/websockets/websocket_plugin_test.go | 130 +++++++++++++--------- 1 file changed, 75 insertions(+), 55 deletions(-) (limited to 'tests/plugins/websockets/websocket_plugin_test.go') diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index f5289752..b2c756bf 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -18,7 +18,7 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/pkg/pubsub" - message2 "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -32,7 +32,7 @@ import ( ) type Msg struct { - // Topic message been pushed into. + // Topic makeMessage been pushed into. Topics []string `json:"topic"` // Command (join, leave, headers) @@ -134,7 +134,7 @@ func wsInit(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -244,7 +244,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -261,14 +261,14 @@ func RPCWsMemoryPubAsync(t *testing.T) { publishAsync("", "memory", "foo") - // VERIFY a message + // VERIFY a makeMessage _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) assert.Equal(t, "hello, PHP", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -291,7 +291,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { publishAsync2("", "memory", "foo2") }() - // should be only message from the subscribed foo2 topic + // should be only makeMessage from the subscribed foo2 topic _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) @@ -318,7 +318,7 @@ func RPCWsMemory(t *testing.T) { } }() - d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -335,14 +335,14 @@ func RPCWsMemory(t *testing.T) { publish("", "memory", "foo") - // VERIFY a message + // VERIFY a makeMessage _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) assert.Equal(t, "hello, PHP", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -365,7 +365,7 @@ func RPCWsMemory(t *testing.T) { publish2("", "memory", "foo2") }() - // should be only message from the subscribed foo2 topic + // should be only makeMessage from the subscribed foo2 topic _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) @@ -390,7 +390,7 @@ func RPCWsRedis(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(message("join", "redis", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -407,14 +407,14 @@ func RPCWsRedis(t *testing.T) { publish("", "redis", "foo") - // VERIFY a message + // VERIFY a makeMessage _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) assert.Equal(t, "hello, PHP", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(message("leave", "redis", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -437,7 +437,7 @@ func RPCWsRedis(t *testing.T) { publish2("", "redis", "foo2") }() - // should be only message from the subscribed foo2 topic + // should be only makeMessage from the subscribed foo2 topic _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) @@ -540,7 +540,7 @@ func RPCWsMemoryDeny(t *testing.T) { } }() - d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -556,7 +556,7 @@ func RPCWsMemoryDeny(t *testing.T) { assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -761,7 +761,7 @@ func RPCWsMemoryAllow(t *testing.T) { } }() - d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -778,14 +778,14 @@ func RPCWsMemoryAllow(t *testing.T) { publish("", "memory", "foo") - // VERIFY a message + // VERIFY a makeMessage _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) assert.Equal(t, "hello, PHP", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -808,7 +808,7 @@ func RPCWsMemoryAllow(t *testing.T) { publish2("", "memory", "foo2") }() - // should be only message from the subscribed foo2 topic + // should be only makeMessage from the subscribed foo2 topic _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) @@ -827,7 +827,7 @@ func publish(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool - err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret) + err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret) if err != nil { panic(err) } @@ -842,7 +842,7 @@ func publishAsync(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool - err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret) + err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret) if err != nil { panic(err) } @@ -857,7 +857,7 @@ func publishAsync2(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool - err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret) + err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret) if err != nil { panic(err) } @@ -872,13 +872,12 @@ func publish2(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool - err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret) + err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret) if err != nil { panic(err) } } - -func message(command string, broker string, payload []byte, topics ...string) *Msg { +func messageWS(command string, broker string, payload []byte, topics ...string) *Msg { return &Msg{ Topics: topics, Command: command, @@ -887,49 +886,70 @@ func message(command string, broker string, payload []byte, topics ...string) *M } } -func makePayload(b *flatbuffers.Builder, storage string, items []pubsub.Message) []byte { - b.Reset() +func makeMessage(command string, broker string, payload []byte, topics ...string) []byte { + m := []pubsub.Message{ + { + Topics: topics, + Command: command, + Broker: broker, + Payload: payload, + }, + } - storageOffset := b.CreateString(storage) + b := flatbuffers.NewBuilder(1) - // //////////////////// ITEMS VECTOR //////////////////////////// - offset := make([]flatbuffers.UOffsetT, len(items)) - for i := len(items) - 1; i >= 0; i-- { - offset[i] = serializeItems(b, items[i]) - } + return msgs(b, m) +} - message2.MessageStartTopicsVector(b, len(offset)) +func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT { + cmdOff := b.CreateString(msg.Command) + brokerOff := b.CreateString(msg.Broker) - for i := len(offset) - 1; i >= 0; i-- { - b.PrependUOffsetT(offset[i]) + offsets := make([]flatbuffers.UOffsetT, len(msg.Topics)) + for j := len(msg.Topics) - 1; j >= 0; j-- { + offsets[j] = b.CreateString(msg.Topics[j]) } - itemsOffset := b.EndVector(len(offset)) - // ///////////////////////////////////////////////////////////////// + message.MessageStartTopicsVector(b, len(offsets)) - message2.MessageStart(b) - message2.MessagesAddMessages(b, itemsOffset) - message2.PayloadAddStorage(b, storageOffset) + for j := len(offsets) - 1; j >= 0; j-- { + b.PrependUOffsetT(offsets[j]) + } - finalOffset := message2.PayloadEnd(b) + tOff := b.EndVector(len(offsets)) + pOff := b.CreateByteVector(msg.Payload) - b.Finish(finalOffset) + message.MessageStart(b) - return b.Bytes[b.Head():] + message.MessageAddCommand(b, cmdOff) + message.MessageAddBroker(b, brokerOff) + message.MessageAddTopics(b, tOff) + message.MessageAddPayload(b, pOff) + + return message.MessageEnd(b) } -func serializeItems(b *flatbuffers.Builder, item pubsub.Message) flatbuffers.UOffsetT { - br := b.CreateString(item.Broker) - cmd := b.CreateString(item.Command) - payload := b.CreateByteVector(item.Payload) +func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte { + b.Reset() + mOff := make([]flatbuffers.UOffsetT, len(msgs)) + for i := len(msgs) - 1; i >= 0; i-- { + mOff[i] = serializeMsg(b, msgs[i]) + } - message2.MessageStart(b) + message.MessagesStartMessagesVector(b, len(mOff)) - message2.MessageAddBroker(b, br) - message2.MessageAddCommand(b, cmd) - message2.MessageAddPayload(b, payload) + for i := len(mOff) - 1; i >= 0; i-- { + b.PrependUOffsetT(mOff[i]) + } - return message2.MessageEnd(b) + msgsOff := b.EndVector(len(msgs)) + + message.MessagesStart(b) + message.MessagesAddMessages(b, msgsOff) + fOff := message.MessagesEnd(b) + b.Finish(fOff) + + return b.Bytes[b.Head():] } -- cgit v1.2.3