diff options
author | Valery Piashchynski <[email protected]> | 2021-06-24 17:40:49 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-24 17:40:49 +0300 |
commit | e9249c7896331bab97a18a7ee0db17803fdd31fb (patch) | |
tree | 99512001f757eb88614acb9b20dada3200008a5d /tests | |
parent | ce53a8e149b76f15e8a5dd88ac3b953798d57e8b (diff) | |
parent | 60001dbe15b5ff0fec32239ad18b3d308a4150b5 (diff) |
#736 feat(kv): `clear` RPC method which completely cleans storagev2.3.1-beta.6
#736 feat(kv): `clear` RPC method which completely cleans storage
Diffstat (limited to 'tests')
-rw-r--r-- | tests/plugins/broadcast/broadcast_plugin_test.go | 6 | ||||
-rw-r--r-- | tests/plugins/kv/storage_plugin_test.go | 189 | ||||
-rw-r--r-- | tests/plugins/websockets/websocket_plugin_test.go | 123 |
3 files changed, 256 insertions, 62 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index 2cd4b451..0ec813f3 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -205,7 +205,8 @@ func TestBroadcastSameSubscriber(t *testing.T) { cfg, &broadcast.Plugin{}, &rpcPlugin.Plugin{}, - mockLogger, + &logger.ZapLogger{}, + // mockLogger, &server.Plugin{}, &redis.Plugin{}, &websockets.Plugin{}, @@ -314,7 +315,8 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { cfg, &broadcast.Plugin{}, &rpcPlugin.Plugin{}, - mockLogger, + &logger.ZapLogger{}, + // mockLogger, &server.Plugin{}, &redis.Plugin{}, &websockets.Plugin{}, diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index 1e466e06..ced1c5fe 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -575,6 +575,53 @@ func testRPCMethods(t *testing.T) { err = client.Call("kv.Has", keysDel, ret) assert.NoError(t, err) assert.Len(t, ret.GetItems(), 0) + + dataClear := &payload.Request{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: []byte("aa"), + }, + { + Key: "b", + Value: []byte("bb"), + }, + { + Key: "c", + Value: []byte("cc"), + }, + { + Key: "d", + Value: []byte("dd"), + }, + { + Key: "e", + Value: []byte("ee"), + }, + }, + } + + clear := &payload.Request{Storage: "boltdb-rr"} + + ret = &payload.Response{} + // Register 3 keys with values + err = client.Call("kv.Set", dataClear, ret) + assert.NoError(t, err) + + ret = &payload.Response{} + err = client.Call("kv.Has", dataClear, ret) + assert.NoError(t, err) + assert.Len(t, ret.GetItems(), 5) // should be 5 + + ret = &payload.Response{} + err = client.Call("kv.Clear", clear, ret) + assert.NoError(t, err) + + ret = &payload.Response{} + err = client.Call("kv.Has", dataClear, ret) + assert.NoError(t, err) + assert.Len(t, ret.GetItems(), 0) // should be 5 } func TestMemcached(t *testing.T) { @@ -790,6 +837,54 @@ func testRPCMethodsMemcached(t *testing.T) { err = client.Call("kv.Has", keysDel, ret) assert.NoError(t, err) assert.Len(t, ret.GetItems(), 0) + + dataClear := &payload.Request{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: []byte("aa"), + }, + { + Key: "b", + Value: []byte("bb"), + }, + { + Key: "c", + Value: []byte("cc"), + }, + { + Key: "d", + Value: []byte("dd"), + }, + { + Key: "e", + Value: []byte("ee"), + }, + }, + } + + clear := &payload.Request{Storage: "memcached-rr"} + + ret = &payload.Response{} + // Register 3 keys with values + err = client.Call("kv.Set", dataClear, ret) + assert.NoError(t, err) + + ret = &payload.Response{} + err = client.Call("kv.Has", dataClear, ret) + assert.NoError(t, err) + assert.Len(t, ret.GetItems(), 5) // should be 5 + + ret = &payload.Response{} + err = client.Call("kv.Clear", clear, ret) + assert.NoError(t, err) + + time.Sleep(time.Second * 2) + ret = &payload.Response{} + err = client.Call("kv.Has", dataClear, ret) + assert.NoError(t, err) + assert.Len(t, ret.GetItems(), 0) // should be 5 } func TestInMemory(t *testing.T) { @@ -1004,6 +1099,53 @@ func testRPCMethodsInMemory(t *testing.T) { err = client.Call("kv.Has", keysDel, ret) assert.NoError(t, err) assert.Len(t, ret.GetItems(), 0) + + dataClear := &payload.Request{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: []byte("aa"), + }, + { + Key: "b", + Value: []byte("bb"), + }, + { + Key: "c", + Value: []byte("cc"), + }, + { + Key: "d", + Value: []byte("dd"), + }, + { + Key: "e", + Value: []byte("ee"), + }, + }, + } + + clear := &payload.Request{Storage: "memory-rr"} + + ret = &payload.Response{} + // Register 3 keys with values + err = client.Call("kv.Set", dataClear, ret) + assert.NoError(t, err) + + ret = &payload.Response{} + err = client.Call("kv.Has", dataClear, ret) + assert.NoError(t, err) + assert.Len(t, ret.GetItems(), 5) // should be 5 + + ret = &payload.Response{} + err = client.Call("kv.Clear", clear, ret) + assert.NoError(t, err) + + ret = &payload.Response{} + err = client.Call("kv.Has", dataClear, ret) + assert.NoError(t, err) + assert.Len(t, ret.GetItems(), 0) // should be 5 } func TestRedis(t *testing.T) { @@ -1354,4 +1496,51 @@ func testRPCMethodsRedis(t *testing.T) { err = client.Call("kv.Has", keysDel, ret) assert.NoError(t, err) assert.Len(t, ret.GetItems(), 0) + + dataClear := &payload.Request{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: []byte("aa"), + }, + { + Key: "b", + Value: []byte("bb"), + }, + { + Key: "c", + Value: []byte("cc"), + }, + { + Key: "d", + Value: []byte("dd"), + }, + { + Key: "e", + Value: []byte("ee"), + }, + }, + } + + clear := &payload.Request{Storage: "redis-rr"} + + ret = &payload.Response{} + // Register 3 keys with values + err = client.Call("kv.Set", dataClear, ret) + assert.NoError(t, err) + + ret = &payload.Response{} + err = client.Call("kv.Has", dataClear, ret) + assert.NoError(t, err) + assert.Len(t, ret.GetItems(), 5) // should be 5 + + ret = &payload.Response{} + err = client.Call("kv.Clear", clear, ret) + assert.NoError(t, err) + + ret = &payload.Response{} + err = client.Call("kv.Has", dataClear, ret) + assert.NoError(t, err) + assert.Len(t, ret.GetItems(), 0) // should be 5 } diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 5ed0c3f3..53b6a572 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -645,7 +645,7 @@ func RPCWsPubAsync(port string) func(t *testing.T) { return func(t *testing.T) { da := websocket.Dialer{ Proxy: http.ProxyFromEnvironment, - HandshakeTimeout: time.Second * 18, + HandshakeTimeout: time.Second * 20, } connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"} @@ -654,9 +654,32 @@ func RPCWsPubAsync(port string) func(t *testing.T) { assert.NoError(t, err) defer func() { - _ = resp.Body.Close() + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + }() + + go func() { + messagesToVerify := make([]string, 0, 10) + messagesToVerify = append(messagesToVerify, `{"topic":"@join","payload":["foo","foo2"]}`) + messagesToVerify = append(messagesToVerify, `{"topic":"foo","payload":"hello, PHP"}`) + messagesToVerify = append(messagesToVerify, `{"topic":"@leave","payload":["foo"]}`) + messagesToVerify = append(messagesToVerify, `{"topic":"foo2","payload":"hello, PHP2"}`) + i := 0 + for { + _, msg, err2 := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err2) + assert.Equal(t, messagesToVerify[i], retMsg) + i++ + if i == 3 { + return + } + } }() + time.Sleep(time.Second) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) @@ -665,20 +688,11 @@ func RPCWsPubAsync(port string) func(t *testing.T) { err = c.WriteMessage(websocket.BinaryMessage, d) assert.NoError(t, err) - _, msg, err := c.ReadMessage() - retMsg := utils.AsString(msg) - assert.NoError(t, err) + time.Sleep(time.Second) - // subscription done - assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + publishAsync(t, "foo") - publishAsync(t, "placeholder", "foo") - - // VERIFY a makeMessage - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) + time.Sleep(time.Second) // //// LEAVE foo ///////// d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) @@ -689,27 +703,16 @@ func RPCWsPubAsync(port string) func(t *testing.T) { err = c.WriteMessage(websocket.BinaryMessage, d) assert.NoError(t, err) - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + time.Sleep(time.Second) // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC - publishAsync(t, "placeholder", "foo") + publishAsync(t, "foo") go func() { - time.Sleep(time.Second * 3) - publishAsync(t, "placeholder", "foo2") + time.Sleep(time.Second * 5) + publishAsync(t, "foo2") }() - // should be only makeMessage from the subscribed foo0 topic - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP\"}", retMsg) - err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) assert.NoError(t, err) } @@ -733,6 +736,27 @@ func RPCWsPub(port string) func(t *testing.T) { } }() + go func() { + messagesToVerify := make([]string, 0, 10) + messagesToVerify = append(messagesToVerify, `{"topic":"@join","payload":["foo","foo2"]}`) + messagesToVerify = append(messagesToVerify, `{"topic":"foo","payload":"hello, PHP"}`) + messagesToVerify = append(messagesToVerify, `{"topic":"@leave","payload":["foo"]}`) + messagesToVerify = append(messagesToVerify, `{"topic":"foo2","payload":"hello, PHP2"}`) + i := 0 + for { + _, msg, err2 := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err2) + assert.Equal(t, messagesToVerify[i], retMsg) + i++ + if i == 3 { + return + } + } + }() + + time.Sleep(time.Second) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) @@ -741,20 +765,11 @@ func RPCWsPub(port string) func(t *testing.T) { err = c.WriteMessage(websocket.BinaryMessage, d) assert.NoError(t, err) - _, msg, err := c.ReadMessage() - retMsg := utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + time.Sleep(time.Second) publish("", "foo") - // VERIFY a makeMessage - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) + time.Sleep(time.Second) // //// LEAVE foo ///////// d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) @@ -765,12 +780,7 @@ func RPCWsPub(port string) func(t *testing.T) { err = c.WriteMessage(websocket.BinaryMessage, d) assert.NoError(t, err) - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + time.Sleep(time.Second) // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC publish("", "foo") @@ -780,12 +790,6 @@ func RPCWsPub(port string) func(t *testing.T) { publish2(t, "", "foo2") }() - // should be only makeMessage from the subscribed foo2 topic - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg) - err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) assert.NoError(t, err) } @@ -849,7 +853,7 @@ func RPCWsDeny(port string) func(t *testing.T) { // --------------------------------------------------------------------------------------------------- -func publish(command string, topics ...string) { +func publish(topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -858,13 +862,13 @@ func publish(command string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP"), topics...), ret) + err = client.Call("broadcast.Publish", makeMessage([]byte("hello, PHP"), topics...), ret) if err != nil { panic(err) } } -func publishAsync(t *testing.T, command string, topics ...string) { +func publishAsync(t *testing.T, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -873,12 +877,12 @@ func publishAsync(t *testing.T, command string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP"), topics...), ret) + err = client.Call("broadcast.PublishAsync", makeMessage([]byte("hello, PHP"), topics...), ret) assert.NoError(t, err) assert.True(t, ret.Ok) } -func publish2(t *testing.T, command string, topics ...string) { +func publish2(t *testing.T, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -887,7 +891,7 @@ func publish2(t *testing.T, command string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP2"), topics...), ret) + err = client.Call("broadcast.Publish", makeMessage([]byte("hello, PHP2"), topics...), ret) assert.NoError(t, err) assert.True(t, ret.Ok) } @@ -900,12 +904,11 @@ func messageWS(command string, payload []byte, topics ...string) *websocketsv1.M } } -func makeMessage(command string, payload []byte, topics ...string) *websocketsv1.Request { +func makeMessage(payload []byte, topics ...string) *websocketsv1.Request { m := &websocketsv1.Request{ Messages: []*websocketsv1.Message{ { Topics: topics, - Command: command, Payload: payload, }, }, |