summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-25 11:47:24 +0300
committerValery Piashchynski <[email protected]>2021-06-25 11:47:24 +0300
commit09041fe7499be1929aaffaa113f93f6853564c09 (patch)
treea608df4250439cf9a1511f9e00ff7af6a033579d /tests
parent64d41f6d87bb8002e700cdcddb5a3a241e4a2a7d (diff)
parente9249c7896331bab97a18a7ee0db17803fdd31fb (diff)
Merge remote-tracking branch 'origin/master' into feature/jobs_plugin
Diffstat (limited to 'tests')
-rw-r--r--tests/plugins/kv/storage_plugin_test.go189
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go123
2 files changed, 252 insertions, 60 deletions
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index 1e466e06..ced1c5fe 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -575,6 +575,53 @@ func testRPCMethods(t *testing.T) {
err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
assert.Len(t, ret.GetItems(), 0)
+
+ dataClear := &payload.Request{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: []byte("aa"),
+ },
+ {
+ Key: "b",
+ Value: []byte("bb"),
+ },
+ {
+ Key: "c",
+ Value: []byte("cc"),
+ },
+ {
+ Key: "d",
+ Value: []byte("dd"),
+ },
+ {
+ Key: "e",
+ Value: []byte("ee"),
+ },
+ },
+ }
+
+ clear := &payload.Request{Storage: "boltdb-rr"}
+
+ ret = &payload.Response{}
+ // Register 3 keys with values
+ err = client.Call("kv.Set", dataClear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 5) // should be 5
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Clear", clear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 0) // should be 5
}
func TestMemcached(t *testing.T) {
@@ -790,6 +837,54 @@ func testRPCMethodsMemcached(t *testing.T) {
err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
assert.Len(t, ret.GetItems(), 0)
+
+ dataClear := &payload.Request{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: []byte("aa"),
+ },
+ {
+ Key: "b",
+ Value: []byte("bb"),
+ },
+ {
+ Key: "c",
+ Value: []byte("cc"),
+ },
+ {
+ Key: "d",
+ Value: []byte("dd"),
+ },
+ {
+ Key: "e",
+ Value: []byte("ee"),
+ },
+ },
+ }
+
+ clear := &payload.Request{Storage: "memcached-rr"}
+
+ ret = &payload.Response{}
+ // Register 3 keys with values
+ err = client.Call("kv.Set", dataClear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 5) // should be 5
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Clear", clear, ret)
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second * 2)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 0) // should be 5
}
func TestInMemory(t *testing.T) {
@@ -1004,6 +1099,53 @@ func testRPCMethodsInMemory(t *testing.T) {
err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
assert.Len(t, ret.GetItems(), 0)
+
+ dataClear := &payload.Request{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: []byte("aa"),
+ },
+ {
+ Key: "b",
+ Value: []byte("bb"),
+ },
+ {
+ Key: "c",
+ Value: []byte("cc"),
+ },
+ {
+ Key: "d",
+ Value: []byte("dd"),
+ },
+ {
+ Key: "e",
+ Value: []byte("ee"),
+ },
+ },
+ }
+
+ clear := &payload.Request{Storage: "memory-rr"}
+
+ ret = &payload.Response{}
+ // Register 3 keys with values
+ err = client.Call("kv.Set", dataClear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 5) // should be 5
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Clear", clear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 0) // should be 5
}
func TestRedis(t *testing.T) {
@@ -1354,4 +1496,51 @@ func testRPCMethodsRedis(t *testing.T) {
err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
assert.Len(t, ret.GetItems(), 0)
+
+ dataClear := &payload.Request{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: []byte("aa"),
+ },
+ {
+ Key: "b",
+ Value: []byte("bb"),
+ },
+ {
+ Key: "c",
+ Value: []byte("cc"),
+ },
+ {
+ Key: "d",
+ Value: []byte("dd"),
+ },
+ {
+ Key: "e",
+ Value: []byte("ee"),
+ },
+ },
+ }
+
+ clear := &payload.Request{Storage: "redis-rr"}
+
+ ret = &payload.Response{}
+ // Register 3 keys with values
+ err = client.Call("kv.Set", dataClear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 5) // should be 5
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Clear", clear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 0) // should be 5
}
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 5ed0c3f3..53b6a572 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -645,7 +645,7 @@ func RPCWsPubAsync(port string) func(t *testing.T) {
return func(t *testing.T) {
da := websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 18,
+ HandshakeTimeout: time.Second * 20,
}
connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"}
@@ -654,9 +654,32 @@ func RPCWsPubAsync(port string) func(t *testing.T) {
assert.NoError(t, err)
defer func() {
- _ = resp.Body.Close()
+ if resp != nil && resp.Body != nil {
+ _ = resp.Body.Close()
+ }
+ }()
+
+ go func() {
+ messagesToVerify := make([]string, 0, 10)
+ messagesToVerify = append(messagesToVerify, `{"topic":"@join","payload":["foo","foo2"]}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"foo","payload":"hello, PHP"}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"@leave","payload":["foo"]}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"foo2","payload":"hello, PHP2"}`)
+ i := 0
+ for {
+ _, msg, err2 := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err2)
+ assert.Equal(t, messagesToVerify[i], retMsg)
+ i++
+ if i == 3 {
+ return
+ }
+ }
}()
+ time.Sleep(time.Second)
+
d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
@@ -665,20 +688,11 @@ func RPCWsPubAsync(port string) func(t *testing.T) {
err = c.WriteMessage(websocket.BinaryMessage, d)
assert.NoError(t, err)
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
+ time.Sleep(time.Second)
- // subscription done
- assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+ publishAsync(t, "foo")
- publishAsync(t, "placeholder", "foo")
-
- // VERIFY a makeMessage
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
+ time.Sleep(time.Second)
// //// LEAVE foo /////////
d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
@@ -689,27 +703,16 @@ func RPCWsPubAsync(port string) func(t *testing.T) {
err = c.WriteMessage(websocket.BinaryMessage, d)
assert.NoError(t, err)
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+ time.Sleep(time.Second)
// TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
- publishAsync(t, "placeholder", "foo")
+ publishAsync(t, "foo")
go func() {
- time.Sleep(time.Second * 3)
- publishAsync(t, "placeholder", "foo2")
+ time.Sleep(time.Second * 5)
+ publishAsync(t, "foo2")
}()
- // should be only makeMessage from the subscribed foo0 topic
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP\"}", retMsg)
-
err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
assert.NoError(t, err)
}
@@ -733,6 +736,27 @@ func RPCWsPub(port string) func(t *testing.T) {
}
}()
+ go func() {
+ messagesToVerify := make([]string, 0, 10)
+ messagesToVerify = append(messagesToVerify, `{"topic":"@join","payload":["foo","foo2"]}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"foo","payload":"hello, PHP"}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"@leave","payload":["foo"]}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"foo2","payload":"hello, PHP2"}`)
+ i := 0
+ for {
+ _, msg, err2 := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err2)
+ assert.Equal(t, messagesToVerify[i], retMsg)
+ i++
+ if i == 3 {
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second)
+
d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
@@ -741,20 +765,11 @@ func RPCWsPub(port string) func(t *testing.T) {
err = c.WriteMessage(websocket.BinaryMessage, d)
assert.NoError(t, err)
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+ time.Sleep(time.Second)
publish("", "foo")
- // VERIFY a makeMessage
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
+ time.Sleep(time.Second)
// //// LEAVE foo /////////
d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
@@ -765,12 +780,7 @@ func RPCWsPub(port string) func(t *testing.T) {
err = c.WriteMessage(websocket.BinaryMessage, d)
assert.NoError(t, err)
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+ time.Sleep(time.Second)
// TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
publish("", "foo")
@@ -780,12 +790,6 @@ func RPCWsPub(port string) func(t *testing.T) {
publish2(t, "", "foo2")
}()
- // should be only makeMessage from the subscribed foo2 topic
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg)
-
err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
assert.NoError(t, err)
}
@@ -849,7 +853,7 @@ func RPCWsDeny(port string) func(t *testing.T) {
// ---------------------------------------------------------------------------------------------------
-func publish(command string, topics ...string) {
+func publish(topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -858,13 +862,13 @@ func publish(command string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP"), topics...), ret)
+ err = client.Call("broadcast.Publish", makeMessage([]byte("hello, PHP"), topics...), ret)
if err != nil {
panic(err)
}
}
-func publishAsync(t *testing.T, command string, topics ...string) {
+func publishAsync(t *testing.T, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -873,12 +877,12 @@ func publishAsync(t *testing.T, command string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP"), topics...), ret)
+ err = client.Call("broadcast.PublishAsync", makeMessage([]byte("hello, PHP"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
-func publish2(t *testing.T, command string, topics ...string) {
+func publish2(t *testing.T, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -887,7 +891,7 @@ func publish2(t *testing.T, command string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP2"), topics...), ret)
+ err = client.Call("broadcast.Publish", makeMessage([]byte("hello, PHP2"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
@@ -900,12 +904,11 @@ func messageWS(command string, payload []byte, topics ...string) *websocketsv1.M
}
}
-func makeMessage(command string, payload []byte, topics ...string) *websocketsv1.Request {
+func makeMessage(payload []byte, topics ...string) *websocketsv1.Request {
m := &websocketsv1.Request{
Messages: []*websocketsv1.Message{
{
Topics: topics,
- Command: command,
Payload: payload,
},
},