diff options
Diffstat (limited to 'tests/plugins/websockets/websocket_plugin_test.go')
-rw-r--r-- | tests/plugins/websockets/websocket_plugin_test.go | 88 |
1 files changed, 14 insertions, 74 deletions
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index b2c756bf..4e4c09f1 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -13,16 +13,13 @@ import ( "time" "github.com/fasthttp/websocket" - flatbuffers "github.com/google/flatbuffers/go" json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -170,7 +167,6 @@ func TestWSRedisAndMemory(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -220,9 +216,9 @@ func TestWSRedisAndMemory(t *testing.T) { }() time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) t.Run("RPCWsMemory", RPCWsMemory) t.Run("RPCWsRedis", RPCWsRedis) - t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) stopCh <- struct{}{} @@ -464,7 +460,6 @@ func TestWSMemoryDeny(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -592,7 +587,6 @@ func TestWSMemoryStop(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -685,7 +679,6 @@ func TestWSMemoryOk(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -877,8 +870,8 @@ func publish2(command string, broker string, topics ...string) { panic(err) } } -func messageWS(command string, broker string, payload []byte, topics ...string) *Msg { - return &Msg{ +func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message { + return &websocketsv1.Message{ Topics: topics, Command: command, Broker: broker, @@ -886,70 +879,17 @@ func messageWS(command string, broker string, payload []byte, topics ...string) } } -func makeMessage(command string, broker string, payload []byte, topics ...string) []byte { - m := []pubsub.Message{ - { - Topics: topics, - Command: command, - Broker: broker, - Payload: payload, +func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Messages { + m := &websocketsv1.Messages{ + Messages: []*websocketsv1.Message{ + { + Topics: topics, + Command: command, + Broker: broker, + Payload: payload, + }, }, } - b := flatbuffers.NewBuilder(1) - - return msgs(b, m) -} - -func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT { - cmdOff := b.CreateString(msg.Command) - brokerOff := b.CreateString(msg.Broker) - - offsets := make([]flatbuffers.UOffsetT, len(msg.Topics)) - for j := len(msg.Topics) - 1; j >= 0; j-- { - offsets[j] = b.CreateString(msg.Topics[j]) - } - - message.MessageStartTopicsVector(b, len(offsets)) - - for j := len(offsets) - 1; j >= 0; j-- { - b.PrependUOffsetT(offsets[j]) - } - - tOff := b.EndVector(len(offsets)) - pOff := b.CreateByteVector(msg.Payload) - - message.MessageStart(b) - - message.MessageAddCommand(b, cmdOff) - message.MessageAddBroker(b, brokerOff) - message.MessageAddTopics(b, tOff) - message.MessageAddPayload(b, pOff) - - return message.MessageEnd(b) -} - -func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte { - b.Reset() - - mOff := make([]flatbuffers.UOffsetT, len(msgs)) - - for i := len(msgs) - 1; i >= 0; i-- { - mOff[i] = serializeMsg(b, msgs[i]) - } - - message.MessagesStartMessagesVector(b, len(mOff)) - - for i := len(mOff) - 1; i >= 0; i-- { - b.PrependUOffsetT(mOff[i]) - } - - msgsOff := b.EndVector(len(msgs)) - - message.MessagesStart(b) - message.MessagesAddMessages(b, msgsOff) - fOff := message.MessagesEnd(b) - b.Finish(fOff) - - return b.Bytes[b.Head():] + return m } |