summaryrefslogtreecommitdiff
path: root/tests/plugins/websockets
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-28 14:03:41 +0300
committerValery Piashchynski <[email protected]>2021-05-28 14:03:41 +0300
commit8f13eb958c7eec49acba6e343edb77c6ede89f09 (patch)
tree8a8c3209d8890b630f378c69e32b37fe41696d6c /tests/plugins/websockets
parent9dd5a953754d72e259e79177b3e92e1f762f116a (diff)
- PubAsync tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests/plugins/websockets')
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go103
1 files changed, 103 insertions, 0 deletions
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 294bbf0d..087d1bc9 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -219,12 +219,85 @@ func TestWSRedisAndMemory(t *testing.T) {
time.Sleep(time.Second * 1)
t.Run("RPCWsMemory", RPCWsMemory)
t.Run("RPCWsRedis", RPCWsRedis)
+ t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
stopCh <- struct{}{}
wg.Wait()
}
+func RPCWsMemoryPubAsync(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+
+ defer func() {
+ _ = resp.Body.Close()
+ }()
+
+ d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+
+ publishAsync("", "memory", "foo")
+
+ // VERIFY a message
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "hello, PHP", retMsg)
+
+ // //// LEAVE foo, foo2 /////////
+ d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+
+ // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
+ publishAsync("", "memory", "foo")
+
+ go func() {
+ time.Sleep(time.Second * 5)
+ publishAsync2("", "memory", "foo2")
+ }()
+
+ // should be only message from the subscribed foo2 topic
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "hello, PHP2", retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
+}
+
func RPCWsMemory(t *testing.T) {
da := websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
@@ -384,6 +457,36 @@ func publish(command string, broker string, topics ...string) {
}
}
+func publishAsync(command string, broker string, topics ...string) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ if err != nil {
+ panic(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ var ret bool
+ err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret)
+ if err != nil {
+ panic(err)
+ }
+}
+
+func publishAsync2(command string, broker string, topics ...string) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ if err != nil {
+ panic(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ var ret bool
+ err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret)
+ if err != nil {
+ panic(err)
+ }
+}
+
func publish2(command string, broker string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {