diff options
Diffstat (limited to 'tests/plugins/websockets')
-rw-r--r-- | tests/plugins/websockets/websocket_plugin_test.go | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 294bbf0d..087d1bc9 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -219,12 +219,85 @@ func TestWSRedisAndMemory(t *testing.T) { time.Sleep(time.Second * 1) t.Run("RPCWsMemory", RPCWsMemory) t.Run("RPCWsRedis", RPCWsRedis) + t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) stopCh <- struct{}{} wg.Wait() } +func RPCWsMemoryPubAsync(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + _ = resp.Body.Close() + }() + + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + publishAsync("", "memory", "foo") + + // VERIFY a message + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP", retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publishAsync("", "memory", "foo") + + go func() { + time.Sleep(time.Second * 5) + publishAsync2("", "memory", "foo2") + }() + + // should be only message from the subscribed foo2 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP2", retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + func RPCWsMemory(t *testing.T) { da := websocket.Dialer{ Proxy: http.ProxyFromEnvironment, @@ -384,6 +457,36 @@ func publish(command string, broker string, topics ...string) { } } +func publishAsync(command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var ret bool + err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret) + if err != nil { + panic(err) + } +} + +func publishAsync2(command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var ret bool + err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret) + if err != nil { + panic(err) + } +} + func publish2(command string, broker string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { |