summaryrefslogtreecommitdiff
path: root/tests/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'tests/plugins')
-rw-r--r--tests/plugins/kv/storage_plugin_test.go632
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go79
2 files changed, 346 insertions, 365 deletions
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index b4122e8a..1bcb3455 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -10,7 +10,6 @@ import (
"testing"
"time"
- flatbuffers "github.com/google/flatbuffers/go"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -19,57 +18,12 @@ import (
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
+ "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/stretchr/testify/assert"
)
-func makePayload(b *flatbuffers.Builder, storage string, items []kv.Item) []byte {
- b.Reset()
-
- storageOffset := b.CreateString(storage)
-
- // //////////////////// ITEMS VECTOR ////////////////////////////
- offset := make([]flatbuffers.UOffsetT, len(items))
- for i := len(items) - 1; i >= 0; i-- {
- offset[i] = serializeItems(b, items[i])
- }
-
- generated.PayloadStartItemsVector(b, len(offset))
-
- for i := len(offset) - 1; i >= 0; i-- {
- b.PrependUOffsetT(offset[i])
- }
-
- itemsOffset := b.EndVector(len(offset))
- // /////////////////////////////////////////////////////////////////
-
- generated.PayloadStart(b)
- generated.PayloadAddItems(b, itemsOffset)
- generated.PayloadAddStorage(b, storageOffset)
-
- finalOffset := generated.PayloadEnd(b)
-
- b.Finish(finalOffset)
-
- return b.Bytes[b.Head():]
-}
-
-func serializeItems(b *flatbuffers.Builder, item kv.Item) flatbuffers.UOffsetT {
- key := b.CreateString(item.Key)
- val := b.CreateString(item.Value)
- ttl := b.CreateString(item.TTL)
-
- generated.ItemStart(b)
-
- generated.ItemAddKey(b, key)
- generated.ItemAddValue(b, val)
- generated.ItemAddTimeout(b, ttl)
-
- return generated.ItemEnd(b)
-}
-
func TestKVInit(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
@@ -154,17 +108,19 @@ func kvSetTest(t *testing.T) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
- b := flatbuffers.NewBuilder(100)
- args := makePayload(b, "boltdb-south", []kv.Item{
- {
- Key: "key",
- Value: "val",
+ p := &payload.Payload{
+ Storage: "boltdb-south",
+ Items: []*payload.Item{
+ {
+ Key: "key",
+ Value: "val",
+ },
},
- })
+ }
var ok bool
- err = client.Call("kv.Set", args, &ok)
+ err = client.Call("kv.Set", p, &ok)
assert.NoError(t, err)
assert.True(t, ok, "Set return result")
}
@@ -175,16 +131,19 @@ func kvHasTest(t *testing.T) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
- b := flatbuffers.NewBuilder(100)
- args := makePayload(b, "boltdb-south", []kv.Item{
- {
- Key: "key",
- Value: "val",
+ p := &payload.Payload{
+ Storage: "boltdb-south",
+ Items: []*payload.Item{
+ {
+ Key: "key",
+ Value: "val",
+ },
},
- })
+ }
+
var ret map[string]bool
- err = client.Call("kv.Has", args, &ret)
+ err = client.Call("kv.Has", p, &ret)
assert.NoError(t, err)
}
@@ -250,7 +209,7 @@ func TestBoltDb(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("testBoltDbRPCMethods", testRPCMethods)
+ t.Run("BOLTDB", testRPCMethods)
stopCh <- struct{}{}
wg.Wait()
@@ -264,40 +223,48 @@ func testRPCMethods(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
- },
- {
- Key: "c",
- },
- })
- data := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{
- {
- Key: "a",
- Value: "aa",
- },
- {
- Key: "b",
- Value: "bb",
- },
- {
- Key: "c",
- Value: "cc",
- TTL: tt,
- },
- {
- Key: "d",
- Value: "dd",
+
+ keys := &payload.Payload{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "c",
+ },
},
- {
- Key: "e",
- Value: "ee",
+ }
+
+ data := &payload.Payload{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: "aa",
+ },
+ {
+ Key: "b",
+ Value: "bb",
+ },
+ {
+ Key: "c",
+ Value: "cc",
+ Timeout: tt,
+ },
+ {
+ Key: "d",
+ Value: "dd",
+ },
+ {
+ Key: "e",
+ Value: "ee",
+ },
},
- })
+ }
var setRes bool
@@ -327,20 +294,24 @@ func testRPCMethods(t *testing.T) {
assert.Equal(t, "bb", mGet["b"].(string))
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{
- {
- Key: "a",
- TTL: tt2,
- },
- {
- Key: "b",
- TTL: tt2,
- },
- {
- Key: "d",
- TTL: tt2,
+
+ data2 := &payload.Payload{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Timeout: tt2,
+ },
+ {
+ Key: "b",
+ Timeout: tt2,
+ },
+ {
+ Key: "d",
+ Timeout: tt2,
+ },
},
- })
+ }
// MEXPIRE
var mExpRes bool
@@ -349,17 +320,21 @@ func testRPCMethods(t *testing.T) {
assert.True(t, mExpRes)
// TTL
- keys2 := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
- },
- {
- Key: "d",
+ keys2 := &payload.Payload{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "d",
+ },
},
- })
+ }
+
ttlRes := make(map[string]interface{})
err = client.Call("kv.TTL", keys2, &ttlRes)
assert.NoError(t, err)
@@ -373,11 +348,15 @@ func testRPCMethods(t *testing.T) {
assert.Len(t, ret, 0)
// DELETE
- keysDel := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{
- {
- Key: "e",
+ keysDel := &payload.Payload{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "e",
+ },
},
- })
+ }
+
var delRet bool
err = client.Call("kv.Delete", keysDel, &delRet)
assert.NoError(t, err)
@@ -464,40 +443,48 @@ func testRPCMethodsMemcached(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
- },
- {
- Key: "c",
- },
- })
- data := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{
- {
- Key: "a",
- Value: "aa",
- },
- {
- Key: "b",
- Value: "bb",
- },
- {
- Key: "c",
- Value: "cc",
- TTL: tt,
- },
- {
- Key: "d",
- Value: "dd",
+
+ keys := &payload.Payload{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "c",
+ },
},
- {
- Key: "e",
- Value: "ee",
+ }
+
+ data := &payload.Payload{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: "aa",
+ },
+ {
+ Key: "b",
+ Value: "bb",
+ },
+ {
+ Key: "c",
+ Value: "cc",
+ Timeout: tt,
+ },
+ {
+ Key: "d",
+ Value: "dd",
+ },
+ {
+ Key: "e",
+ Value: "ee",
+ },
},
- })
+ }
var setRes bool
@@ -527,20 +514,24 @@ func testRPCMethodsMemcached(t *testing.T) {
assert.Equal(t, string("bb"), string(mGet["b"].([]byte)))
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{
- {
- Key: "a",
- TTL: tt2,
- },
- {
- Key: "b",
- TTL: tt2,
- },
- {
- Key: "d",
- TTL: tt2,
+
+ data2 := &payload.Payload{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Timeout: tt2,
+ },
+ {
+ Key: "b",
+ Timeout: tt2,
+ },
+ {
+ Key: "d",
+ Timeout: tt2,
+ },
},
- })
+ }
// MEXPIRE
var mExpRes bool
@@ -549,17 +540,21 @@ func testRPCMethodsMemcached(t *testing.T) {
assert.True(t, mExpRes)
// TTL call is not supported for the memcached driver
- keys2 := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
+ keys2 := &payload.Payload{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "d",
+ },
},
- {
- Key: "d",
- },
- })
+ }
+
ttlRes := make(map[string]interface{})
err = client.Call("kv.TTL", keys2, &ttlRes)
assert.Error(t, err)
@@ -573,11 +568,15 @@ func testRPCMethodsMemcached(t *testing.T) {
assert.Len(t, ret, 0)
// DELETE
- keysDel := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{
- {
- Key: "e",
+ keysDel := &payload.Payload{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "e",
+ },
},
- })
+ }
+
var delRet bool
err = client.Call("kv.Delete", keysDel, &delRet)
assert.NoError(t, err)
@@ -664,40 +663,47 @@ func testRPCMethodsInMemory(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
+ keys := &payload.Payload{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "c",
+ },
},
- {
- Key: "c",
- },
- })
- data := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{
- {
- Key: "a",
- Value: "aa",
- },
- {
- Key: "b",
- Value: "bb",
- },
- {
- Key: "c",
- Value: "cc",
- TTL: tt,
- },
- {
- Key: "d",
- Value: "dd",
- },
- {
- Key: "e",
- Value: "ee",
+ }
+
+ data := &payload.Payload{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: "aa",
+ },
+ {
+ Key: "b",
+ Value: "bb",
+ },
+ {
+ Key: "c",
+ Value: "cc",
+ Timeout: tt,
+ },
+ {
+ Key: "d",
+ Value: "dd",
+ },
+ {
+ Key: "e",
+ Value: "ee",
+ },
},
- })
+ }
var setRes bool
@@ -727,20 +733,24 @@ func testRPCMethodsInMemory(t *testing.T) {
assert.Equal(t, "bb", mGet["b"].(string))
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{
- {
- Key: "a",
- TTL: tt2,
- },
- {
- Key: "b",
- TTL: tt2,
- },
- {
- Key: "d",
- TTL: tt2,
+
+ data2 := &payload.Payload{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Timeout: tt2,
+ },
+ {
+ Key: "b",
+ Timeout: tt2,
+ },
+ {
+ Key: "d",
+ Timeout: tt2,
+ },
},
- })
+ }
// MEXPIRE
var mExpRes bool
@@ -749,17 +759,21 @@ func testRPCMethodsInMemory(t *testing.T) {
assert.True(t, mExpRes)
// TTL
- keys2 := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{
- {
- Key: "a",
+ keys2 := &payload.Payload{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "d",
+ },
},
- {
- Key: "b",
- },
- {
- Key: "d",
- },
- })
+ }
+
ttlRes := make(map[string]interface{})
err = client.Call("kv.TTL", keys2, &ttlRes)
assert.NoError(t, err)
@@ -773,11 +787,15 @@ func testRPCMethodsInMemory(t *testing.T) {
assert.Len(t, ret, 0)
// DELETE
- keysDel := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{
- {
- Key: "e",
+ keysDel := &payload.Payload{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "e",
+ },
},
- })
+ }
+
var delRet bool
err = client.Call("kv.Delete", keysDel, &delRet)
assert.NoError(t, err)
@@ -864,40 +882,47 @@ func testRPCMethodsRedis(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
- },
- {
- Key: "c",
- },
- })
- data := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
- {
- Key: "a",
- Value: "aa",
- },
- {
- Key: "b",
- Value: "bb",
- },
- {
- Key: "c",
- Value: "cc",
- TTL: tt,
- },
- {
- Key: "d",
- Value: "dd",
+ keys := &payload.Payload{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "c",
+ },
},
- {
- Key: "e",
- Value: "ee",
+ }
+
+ data := &payload.Payload{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: "aa",
+ },
+ {
+ Key: "b",
+ Value: "bb",
+ },
+ {
+ Key: "c",
+ Value: "cc",
+ Timeout: tt,
+ },
+ {
+ Key: "d",
+ Value: "dd",
+ },
+ {
+ Key: "e",
+ Value: "ee",
+ },
},
- })
+ }
var setRes bool
@@ -927,20 +952,23 @@ func testRPCMethodsRedis(t *testing.T) {
assert.Equal(t, "bb", mGet["b"].(string))
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
- {
- Key: "a",
- TTL: tt2,
+ data2 := &payload.Payload{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Timeout: tt2,
+ },
+ {
+ Key: "b",
+ Timeout: tt2,
+ },
+ {
+ Key: "d",
+ Timeout: tt2,
+ },
},
- {
- Key: "b",
- TTL: tt2,
- },
- {
- Key: "d",
- TTL: tt2,
- },
- })
+ }
// MEXPIRE
var mExpRes bool
@@ -949,17 +977,21 @@ func testRPCMethodsRedis(t *testing.T) {
assert.True(t, mExpRes)
// TTL
- keys2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
- {
- Key: "a",
+ keys2 := &payload.Payload{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "d",
+ },
},
- {
- Key: "b",
- },
- {
- Key: "d",
- },
- })
+ }
+
ttlRes := make(map[string]interface{})
err = client.Call("kv.TTL", keys2, &ttlRes)
assert.NoError(t, err)
@@ -973,11 +1005,15 @@ func testRPCMethodsRedis(t *testing.T) {
assert.Len(t, ret, 0)
// DELETE
- keysDel := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
- {
- Key: "e",
+ keysDel := &payload.Payload{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "e",
+ },
},
- })
+ }
+
var delRet bool
err = client.Call("kv.Delete", keysDel, &delRet)
assert.NoError(t, err)
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index b2c756bf..5f472106 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -13,11 +13,9 @@ import (
"time"
"github.com/fasthttp/websocket"
- flatbuffers "github.com/google/flatbuffers/go"
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
@@ -877,8 +875,8 @@ func publish2(command string, broker string, topics ...string) {
panic(err)
}
}
-func messageWS(command string, broker string, payload []byte, topics ...string) *Msg {
- return &Msg{
+func messageWS(command string, broker string, payload []byte, topics ...string) *message.Message {
+ return &message.Message{
Topics: topics,
Command: command,
Broker: broker,
@@ -886,70 +884,17 @@ func messageWS(command string, broker string, payload []byte, topics ...string)
}
}
-func makeMessage(command string, broker string, payload []byte, topics ...string) []byte {
- m := []pubsub.Message{
- {
- Topics: topics,
- Command: command,
- Broker: broker,
- Payload: payload,
+func makeMessage(command string, broker string, payload []byte, topics ...string) *message.Messages {
+ m := &message.Messages{
+ Messages: []*message.Message{
+ {
+ Topics: topics,
+ Command: command,
+ Broker: broker,
+ Payload: payload,
+ },
},
}
- b := flatbuffers.NewBuilder(1)
-
- return msgs(b, m)
-}
-
-func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT {
- cmdOff := b.CreateString(msg.Command)
- brokerOff := b.CreateString(msg.Broker)
-
- offsets := make([]flatbuffers.UOffsetT, len(msg.Topics))
- for j := len(msg.Topics) - 1; j >= 0; j-- {
- offsets[j] = b.CreateString(msg.Topics[j])
- }
-
- message.MessageStartTopicsVector(b, len(offsets))
-
- for j := len(offsets) - 1; j >= 0; j-- {
- b.PrependUOffsetT(offsets[j])
- }
-
- tOff := b.EndVector(len(offsets))
- pOff := b.CreateByteVector(msg.Payload)
-
- message.MessageStart(b)
-
- message.MessageAddCommand(b, cmdOff)
- message.MessageAddBroker(b, brokerOff)
- message.MessageAddTopics(b, tOff)
- message.MessageAddPayload(b, pOff)
-
- return message.MessageEnd(b)
-}
-
-func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte {
- b.Reset()
-
- mOff := make([]flatbuffers.UOffsetT, len(msgs))
-
- for i := len(msgs) - 1; i >= 0; i-- {
- mOff[i] = serializeMsg(b, msgs[i])
- }
-
- message.MessagesStartMessagesVector(b, len(mOff))
-
- for i := len(mOff) - 1; i >= 0; i-- {
- b.PrependUOffsetT(mOff[i])
- }
-
- msgsOff := b.EndVector(len(msgs))
-
- message.MessagesStart(b)
- message.MessagesAddMessages(b, msgsOff)
- fOff := message.MessagesEnd(b)
- b.Finish(fOff)
-
- return b.Bytes[b.Head():]
+ return m
}