From fe7bb0fe758d573fe353df028257ed66c6eccf66 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 18 Jun 2021 01:06:16 +0300 Subject: - Rework main parts Signed-off-by: Valery Piashchynski --- tests/plugins/broadcast/broadcast_plugin_test.go | 7 +-- .../broadcast/configs/.rr-broadcast-init.yaml | 14 +++-- .../websockets/configs/.rr-websockets-init.yaml | 12 +++- .../configs/.rr-websockets-memory-allow.yaml | 7 ++- .../configs/.rr-websockets-memory-deny.yaml | 9 +-- .../configs/.rr-websockets-memory-stop.yaml | 9 +-- .../configs/.rr-websockets-redis-memory-local.yaml | 13 +++-- .../configs/.rr-websockets-redis-memory.yaml | 39 ------------- .../configs/.rr-websockets-redis-no-section.yaml | 3 +- .../websockets/configs/.rr-websockets-redis.yaml | 42 ++++++++++++++ tests/plugins/websockets/websocket_plugin_test.go | 65 ++++++++++++---------- 11 files changed, 123 insertions(+), 97 deletions(-) delete mode 100644 tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml create mode 100644 tests/plugins/websockets/configs/.rr-websockets-redis.yaml (limited to 'tests/plugins') diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index 65ee4415..5b195bd0 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -98,8 +98,7 @@ func TestBroadcastInit(t *testing.T) { } }() - time.Sleep(time.Second * 1) - //t.Run("TestWSInit", wsInit) + t.Run("TestWSInit", wsInit) stopCh <- struct{}{} @@ -186,7 +185,7 @@ func messageWS(command string, broker string, payload []byte, topics ...string) return &websocketsv1.Message{ Topics: topics, Command: command, - Broker: broker, + //Broker: broker, Payload: payload, } } @@ -197,7 +196,7 @@ func makeMessage(command string, broker string, payload []byte, topics ...string { Topics: topics, Command: command, - Broker: broker, + //Broker: broker, Payload: payload, }, }, diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml index fa4116d0..6962eeb5 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml @@ -23,19 +23,21 @@ redis: addrs: - "localhost:6379" + broadcast: - default: - driver: redis - test: - driver: memory + default: + driver: redis + addrs: + - "localhost:6379" websockets: - pubsubs: [ "redis" ] + broker: default + allowed_origin: "*" path: "/ws" logs: mode: development - level: error + level: debug endure: grace_period: 120s diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml index dc073be3..b6882d84 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml @@ -23,12 +23,18 @@ redis: addrs: - "localhost:6379" +broadcast: + default: + driver: redis + addrs: + - "localhost:6379" + websockets: - # pubsubs should implement PubSub interface to be collected via endure.Collects - # pubsubs might use general config section or its own - pubsubs: [ "redis" ] + broker: default + allowed_origin: "*" path: "/ws" + logs: mode: development level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml index 896cee05..f81e13e3 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml @@ -23,8 +23,13 @@ redis: addrs: - "localhost:6379" +broadcast: + test: + driver: memory + websockets: - pubsubs: [ "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml index e3bf5218..decb7dcf 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml @@ -19,12 +19,13 @@ http: allocate_timeout: 60s destroy_timeout: 60s -redis: - addrs: - - "localhost:6379" +broadcast: + test: + driver: memory websockets: - pubsubs: [ "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml index 0614f4e7..5377aef2 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml @@ -19,12 +19,13 @@ http: allocate_timeout: 60s destroy_timeout: 60s -redis: - addrs: - - "localhost:6379" +broadcast: + test: + driver: memory websockets: - pubsubs: [ "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml index 27eab557..a077bf9e 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml @@ -20,17 +20,18 @@ http: destroy_timeout: 60s -websockets: - pubsubs: [ "redis", "memory" ] - redis: - addrs: - - "localhost:6379" +broadcast: + test: + driver: memory +websockets: + broker: test + allowed_origin: "*" path: "/ws" logs: mode: development - level: error + level: debug endure: grace_period: 120s diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml deleted file mode 100644 index eedf5377..00000000 --- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml +++ /dev/null @@ -1,39 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../psr-worker-bench.php" - user: "" - group: "" - relay: "pipes" - relay_timeout: "20s" - -http: - address: 127.0.0.1:13235 - max_request_size: 1024 - middleware: [ "websockets" ] - trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] - pool: - num_workers: 2 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - -redis: - addrs: - - "localhost:6379" - -websockets: - # pubsubs should implement PubSub interface to be collected via endure.Collects - # pubsubs might use general config section - pubsubs: [ "redis", "memory" ] - path: "/ws" - -logs: - mode: development - level: error - -endure: - grace_period: 120s - print_graph: false - log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml index fd125794..d80993f2 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml @@ -21,7 +21,8 @@ http: websockets: - pubsubs: [ "redis", "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml new file mode 100644 index 00000000..3557f5f1 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml @@ -0,0 +1,42 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:13235 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +broadcast: + test: + driver: redis + +websockets: + broker: test + allowed_origin: "*" + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 8321297d..cb78117f 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -17,6 +17,7 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -47,6 +48,7 @@ func TestBroadcastInit(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -98,6 +100,8 @@ func TestBroadcastInit(t *testing.T) { time.Sleep(time.Second * 1) t.Run("TestWSInit", wsInit) + t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) + t.Run("RPCWsMemory", RPCWsMemory) stopCh <- struct{}{} @@ -119,7 +123,7 @@ func wsInit(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -156,6 +160,7 @@ func TestWSRedisAndMemory(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -219,7 +224,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) { assert.NoError(t, err) cfg := &config.Viper{ - Path: "configs/.rr-websockets-redis-memory.yaml", + Path: "configs/.rr-websockets-redis.yaml", Prefix: "rr", } @@ -232,6 +237,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -281,8 +287,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) - t.Run("RPCWsMemory", RPCWsMemory) + t.Run("RPCWsRedis", RPCWsRedis) stopCh <- struct{}{} @@ -308,6 +313,7 @@ func TestWSRedisNoSection(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -326,7 +332,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NoError(t, err) @@ -335,7 +341,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -359,7 +365,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -386,7 +392,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg) + assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP\"}", retMsg) err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) assert.NoError(t, err) @@ -398,7 +404,7 @@ func RPCWsMemory(t *testing.T) { HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NoError(t, err) @@ -409,7 +415,7 @@ func RPCWsMemory(t *testing.T) { } }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -433,7 +439,7 @@ func RPCWsMemory(t *testing.T) { assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -481,7 +487,7 @@ func RPCWsRedis(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -505,7 +511,7 @@ func RPCWsRedis(t *testing.T) { assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -556,6 +562,7 @@ func TestWSMemoryDeny(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -631,7 +638,7 @@ func RPCWsMemoryDeny(t *testing.T) { } }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -647,7 +654,7 @@ func RPCWsMemoryDeny(t *testing.T) { assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -684,6 +691,7 @@ func TestWSMemoryStop(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -777,6 +785,7 @@ func TestWSMemoryOk(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -852,7 +861,7 @@ func RPCWsMemoryAllow(t *testing.T) { } }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -876,7 +885,7 @@ func RPCWsMemoryAllow(t *testing.T) { assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -909,7 +918,7 @@ func RPCWsMemoryAllow(t *testing.T) { assert.NoError(t, err) } -func publish(command string, broker string, topics ...string) { +func publish(command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -918,13 +927,13 @@ func publish(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret) + err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP"), topics...), ret) if err != nil { panic(err) } } -func publishAsync(t *testing.T, command string, broker string, topics ...string) { +func publishAsync(t *testing.T, command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -933,12 +942,12 @@ func publishAsync(t *testing.T, command string, broker string, topics ...string) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret) + err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP"), topics...), ret) assert.NoError(t, err) assert.True(t, ret.Ok) } -func publishAsync2(t *testing.T, command string, broker string, topics ...string) { +func publishAsync2(t *testing.T, command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -947,12 +956,12 @@ func publishAsync2(t *testing.T, command string, broker string, topics ...string client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret) + err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP2"), topics...), ret) assert.NoError(t, err) assert.True(t, ret.Ok) } -func publish2(t *testing.T, command string, broker string, topics ...string) { +func publish2(t *testing.T, command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -961,27 +970,25 @@ func publish2(t *testing.T, command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret) + err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP2"), topics...), ret) assert.NoError(t, err) assert.True(t, ret.Ok) } -func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message { +func messageWS(command string, payload []byte, topics ...string) *websocketsv1.Message { return &websocketsv1.Message{ Topics: topics, Command: command, - Broker: broker, Payload: payload, } } -func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Request { +func makeMessage(command string, payload []byte, topics ...string) *websocketsv1.Request { m := &websocketsv1.Request{ Messages: []*websocketsv1.Message{ { Topics: topics, Command: command, - Broker: broker, Payload: payload, }, }, -- cgit v1.2.3