diff options
author | Valery Piashchynski <[email protected]> | 2021-06-08 22:36:40 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-08 22:36:40 +0300 |
commit | 4f3e16892479db4bd8280a46987f3105e46e5c96 (patch) | |
tree | 13c4c3f380d8309b95c9600cc2000d1d5ab87cda /tests | |
parent | 49ce25e80ba99ac91bce7ea2b9b632de53e07c0d (diff) | |
parent | cc271dceb13d3929f0382311dfce3dfed2ce04ce (diff) |
#712 feat(ws, kv): switch from the `flatbuffers` to the `protobuf`v2.3.0-beta.2
#712 feat(ws, kv): switch from the `flatbuffers` to the `protobuf`
Diffstat (limited to 'tests')
-rw-r--r-- | tests/plugins/kv/storage_plugin_test.go | 634 | ||||
-rw-r--r-- | tests/plugins/websockets/websocket_plugin_test.go | 88 |
2 files changed, 348 insertions, 374 deletions
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index b4122e8a..760b6951 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -10,66 +10,20 @@ import ( "testing" "time" - flatbuffers "github.com/google/flatbuffers/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + payload "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis" - "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" "github.com/spiral/roadrunner/v2/plugins/logger" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/stretchr/testify/assert" ) -func makePayload(b *flatbuffers.Builder, storage string, items []kv.Item) []byte { - b.Reset() - - storageOffset := b.CreateString(storage) - - // //////////////////// ITEMS VECTOR //////////////////////////// - offset := make([]flatbuffers.UOffsetT, len(items)) - for i := len(items) - 1; i >= 0; i-- { - offset[i] = serializeItems(b, items[i]) - } - - generated.PayloadStartItemsVector(b, len(offset)) - - for i := len(offset) - 1; i >= 0; i-- { - b.PrependUOffsetT(offset[i]) - } - - itemsOffset := b.EndVector(len(offset)) - // ///////////////////////////////////////////////////////////////// - - generated.PayloadStart(b) - generated.PayloadAddItems(b, itemsOffset) - generated.PayloadAddStorage(b, storageOffset) - - finalOffset := generated.PayloadEnd(b) - - b.Finish(finalOffset) - - return b.Bytes[b.Head():] -} - -func serializeItems(b *flatbuffers.Builder, item kv.Item) flatbuffers.UOffsetT { - key := b.CreateString(item.Key) - val := b.CreateString(item.Value) - ttl := b.CreateString(item.TTL) - - generated.ItemStart(b) - - generated.ItemAddKey(b, key) - generated.ItemAddValue(b, val) - generated.ItemAddTimeout(b, ttl) - - return generated.ItemEnd(b) -} - func TestKVInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) @@ -153,18 +107,19 @@ func kvSetTest(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. - - b := flatbuffers.NewBuilder(100) - args := makePayload(b, "boltdb-south", []kv.Item{ - { - Key: "key", - Value: "val", + p := &payload.Payload{ + Storage: "boltdb-south", + Items: []*payload.Item{ + { + Key: "key", + Value: "val", + }, }, - }) + } var ok bool - err = client.Call("kv.Set", args, &ok) + err = client.Call("kv.Set", p, &ok) assert.NoError(t, err) assert.True(t, ok, "Set return result") } @@ -174,17 +129,19 @@ func kvHasTest(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. - - b := flatbuffers.NewBuilder(100) - args := makePayload(b, "boltdb-south", []kv.Item{ - { - Key: "key", - Value: "val", + p := &payload.Payload{ + Storage: "boltdb-south", + Items: []*payload.Item{ + { + Key: "key", + Value: "val", + }, }, - }) + } + var ret map[string]bool - err = client.Call("kv.Has", args, &ret) + err = client.Call("kv.Has", p, &ret) assert.NoError(t, err) } @@ -250,7 +207,7 @@ func TestBoltDb(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("testBoltDbRPCMethods", testRPCMethods) + t.Run("BOLTDB", testRPCMethods) stopCh <- struct{}{} wg.Wait() @@ -264,40 +221,48 @@ func testRPCMethods(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "c", - }, - }) - data := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ - { - Key: "a", - Value: "aa", - }, - { - Key: "b", - Value: "bb", - }, - { - Key: "c", - Value: "cc", - TTL: tt, - }, - { - Key: "d", - Value: "dd", + + keys := &payload.Payload{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, }, - { - Key: "e", - Value: "ee", + } + + data := &payload.Payload{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + Timeout: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, }, - }) + } var setRes bool @@ -327,20 +292,24 @@ func testRPCMethods(t *testing.T) { assert.Equal(t, "bb", mGet["b"].(string)) tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ - { - Key: "a", - TTL: tt2, - }, - { - Key: "b", - TTL: tt2, - }, - { - Key: "d", - TTL: tt2, + + data2 := &payload.Payload{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "a", + Timeout: tt2, + }, + { + Key: "b", + Timeout: tt2, + }, + { + Key: "d", + Timeout: tt2, + }, }, - }) + } // MEXPIRE var mExpRes bool @@ -349,17 +318,21 @@ func testRPCMethods(t *testing.T) { assert.True(t, mExpRes) // TTL - keys2 := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", + keys2 := &payload.Payload{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, }, - { - Key: "d", - }, - }) + } + ttlRes := make(map[string]interface{}) err = client.Call("kv.TTL", keys2, &ttlRes) assert.NoError(t, err) @@ -373,11 +346,15 @@ func testRPCMethods(t *testing.T) { assert.Len(t, ret, 0) // DELETE - keysDel := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ - { - Key: "e", + keysDel := &payload.Payload{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "e", + }, }, - }) + } + var delRet bool err = client.Call("kv.Delete", keysDel, &delRet) assert.NoError(t, err) @@ -464,40 +441,48 @@ func testRPCMethodsMemcached(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "c", - }, - }) - data := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ - { - Key: "a", - Value: "aa", - }, - { - Key: "b", - Value: "bb", - }, - { - Key: "c", - Value: "cc", - TTL: tt, - }, - { - Key: "d", - Value: "dd", + + keys := &payload.Payload{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, }, - { - Key: "e", - Value: "ee", + } + + data := &payload.Payload{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + Timeout: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, }, - }) + } var setRes bool @@ -527,20 +512,24 @@ func testRPCMethodsMemcached(t *testing.T) { assert.Equal(t, string("bb"), string(mGet["b"].([]byte))) tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ - { - Key: "a", - TTL: tt2, - }, - { - Key: "b", - TTL: tt2, - }, - { - Key: "d", - TTL: tt2, + + data2 := &payload.Payload{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "a", + Timeout: tt2, + }, + { + Key: "b", + Timeout: tt2, + }, + { + Key: "d", + Timeout: tt2, + }, }, - }) + } // MEXPIRE var mExpRes bool @@ -549,17 +538,21 @@ func testRPCMethodsMemcached(t *testing.T) { assert.True(t, mExpRes) // TTL call is not supported for the memcached driver - keys2 := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ - { - Key: "a", + keys2 := &payload.Payload{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, }, - { - Key: "b", - }, - { - Key: "d", - }, - }) + } + ttlRes := make(map[string]interface{}) err = client.Call("kv.TTL", keys2, &ttlRes) assert.Error(t, err) @@ -573,11 +566,15 @@ func testRPCMethodsMemcached(t *testing.T) { assert.Len(t, ret, 0) // DELETE - keysDel := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ - { - Key: "e", + keysDel := &payload.Payload{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "e", + }, }, - }) + } + var delRet bool err = client.Call("kv.Delete", keysDel, &delRet) assert.NoError(t, err) @@ -664,40 +661,47 @@ func testRPCMethodsInMemory(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{ - { - Key: "a", + keys := &payload.Payload{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, }, - { - Key: "b", - }, - { - Key: "c", - }, - }) - data := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{ - { - Key: "a", - Value: "aa", - }, - { - Key: "b", - Value: "bb", - }, - { - Key: "c", - Value: "cc", - TTL: tt, - }, - { - Key: "d", - Value: "dd", - }, - { - Key: "e", - Value: "ee", + } + + data := &payload.Payload{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + Timeout: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, }, - }) + } var setRes bool @@ -727,20 +731,24 @@ func testRPCMethodsInMemory(t *testing.T) { assert.Equal(t, "bb", mGet["b"].(string)) tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{ - { - Key: "a", - TTL: tt2, - }, - { - Key: "b", - TTL: tt2, - }, - { - Key: "d", - TTL: tt2, + + data2 := &payload.Payload{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "a", + Timeout: tt2, + }, + { + Key: "b", + Timeout: tt2, + }, + { + Key: "d", + Timeout: tt2, + }, }, - }) + } // MEXPIRE var mExpRes bool @@ -749,17 +757,21 @@ func testRPCMethodsInMemory(t *testing.T) { assert.True(t, mExpRes) // TTL - keys2 := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", + keys2 := &payload.Payload{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, }, - { - Key: "d", - }, - }) + } + ttlRes := make(map[string]interface{}) err = client.Call("kv.TTL", keys2, &ttlRes) assert.NoError(t, err) @@ -773,11 +785,15 @@ func testRPCMethodsInMemory(t *testing.T) { assert.Len(t, ret, 0) // DELETE - keysDel := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{ - { - Key: "e", + keysDel := &payload.Payload{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "e", + }, }, - }) + } + var delRet bool err = client.Call("kv.Delete", keysDel, &delRet) assert.NoError(t, err) @@ -864,40 +880,47 @@ func testRPCMethodsRedis(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "c", - }, - }) - data := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ - { - Key: "a", - Value: "aa", - }, - { - Key: "b", - Value: "bb", - }, - { - Key: "c", - Value: "cc", - TTL: tt, - }, - { - Key: "d", - Value: "dd", + keys := &payload.Payload{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, }, - { - Key: "e", - Value: "ee", + } + + data := &payload.Payload{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + Timeout: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, }, - }) + } var setRes bool @@ -927,20 +950,23 @@ func testRPCMethodsRedis(t *testing.T) { assert.Equal(t, "bb", mGet["b"].(string)) tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ - { - Key: "a", - TTL: tt2, + data2 := &payload.Payload{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "a", + Timeout: tt2, + }, + { + Key: "b", + Timeout: tt2, + }, + { + Key: "d", + Timeout: tt2, + }, }, - { - Key: "b", - TTL: tt2, - }, - { - Key: "d", - TTL: tt2, - }, - }) + } // MEXPIRE var mExpRes bool @@ -949,17 +975,21 @@ func testRPCMethodsRedis(t *testing.T) { assert.True(t, mExpRes) // TTL - keys2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ - { - Key: "a", + keys2 := &payload.Payload{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, }, - { - Key: "b", - }, - { - Key: "d", - }, - }) + } + ttlRes := make(map[string]interface{}) err = client.Call("kv.TTL", keys2, &ttlRes) assert.NoError(t, err) @@ -973,11 +1003,15 @@ func testRPCMethodsRedis(t *testing.T) { assert.Len(t, ret, 0) // DELETE - keysDel := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ - { - Key: "e", + keysDel := &payload.Payload{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "e", + }, }, - }) + } + var delRet bool err = client.Call("kv.Delete", keysDel, &delRet) assert.NoError(t, err) diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index b2c756bf..4e4c09f1 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -13,16 +13,13 @@ import ( "time" "github.com/fasthttp/websocket" - flatbuffers "github.com/google/flatbuffers/go" json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -170,7 +167,6 @@ func TestWSRedisAndMemory(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -220,9 +216,9 @@ func TestWSRedisAndMemory(t *testing.T) { }() time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) t.Run("RPCWsMemory", RPCWsMemory) t.Run("RPCWsRedis", RPCWsRedis) - t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) stopCh <- struct{}{} @@ -464,7 +460,6 @@ func TestWSMemoryDeny(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -592,7 +587,6 @@ func TestWSMemoryStop(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -685,7 +679,6 @@ func TestWSMemoryOk(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -877,8 +870,8 @@ func publish2(command string, broker string, topics ...string) { panic(err) } } -func messageWS(command string, broker string, payload []byte, topics ...string) *Msg { - return &Msg{ +func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message { + return &websocketsv1.Message{ Topics: topics, Command: command, Broker: broker, @@ -886,70 +879,17 @@ func messageWS(command string, broker string, payload []byte, topics ...string) } } -func makeMessage(command string, broker string, payload []byte, topics ...string) []byte { - m := []pubsub.Message{ - { - Topics: topics, - Command: command, - Broker: broker, - Payload: payload, +func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Messages { + m := &websocketsv1.Messages{ + Messages: []*websocketsv1.Message{ + { + Topics: topics, + Command: command, + Broker: broker, + Payload: payload, + }, }, } - b := flatbuffers.NewBuilder(1) - - return msgs(b, m) -} - -func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT { - cmdOff := b.CreateString(msg.Command) - brokerOff := b.CreateString(msg.Broker) - - offsets := make([]flatbuffers.UOffsetT, len(msg.Topics)) - for j := len(msg.Topics) - 1; j >= 0; j-- { - offsets[j] = b.CreateString(msg.Topics[j]) - } - - message.MessageStartTopicsVector(b, len(offsets)) - - for j := len(offsets) - 1; j >= 0; j-- { - b.PrependUOffsetT(offsets[j]) - } - - tOff := b.EndVector(len(offsets)) - pOff := b.CreateByteVector(msg.Payload) - - message.MessageStart(b) - - message.MessageAddCommand(b, cmdOff) - message.MessageAddBroker(b, brokerOff) - message.MessageAddTopics(b, tOff) - message.MessageAddPayload(b, pOff) - - return message.MessageEnd(b) -} - -func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte { - b.Reset() - - mOff := make([]flatbuffers.UOffsetT, len(msgs)) - - for i := len(msgs) - 1; i >= 0; i-- { - mOff[i] = serializeMsg(b, msgs[i]) - } - - message.MessagesStartMessagesVector(b, len(mOff)) - - for i := len(mOff) - 1; i >= 0; i-- { - b.PrependUOffsetT(mOff[i]) - } - - msgsOff := b.EndVector(len(msgs)) - - message.MessagesStart(b) - message.MessagesAddMessages(b, msgsOff) - fOff := message.MessagesEnd(b) - b.Finish(fOff) - - return b.Bytes[b.Head():] + return m } |