summaryrefslogtreecommitdiff
path: root/tests/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-18 01:06:16 +0300
committerValery Piashchynski <[email protected]>2021-06-18 01:06:16 +0300
commitfe7bb0fe758d573fe353df028257ed66c6eccf66 (patch)
tree74392f8e61e96c85f0d8b684cfc08e3fc3664ae9 /tests/plugins
parent68ff941c4226074206ceed9c30bd95317aa0e9fc (diff)
- Rework main parts
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests/plugins')
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go7
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-init.yaml14
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-init.yaml12
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml7
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml9
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml9
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml13
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml3
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-redis.yaml (renamed from tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml)9
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go65
10 files changed, 87 insertions, 61 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index 65ee4415..5b195bd0 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -98,8 +98,7 @@ func TestBroadcastInit(t *testing.T) {
}
}()
- time.Sleep(time.Second * 1)
- //t.Run("TestWSInit", wsInit)
+ t.Run("TestWSInit", wsInit)
stopCh <- struct{}{}
@@ -186,7 +185,7 @@ func messageWS(command string, broker string, payload []byte, topics ...string)
return &websocketsv1.Message{
Topics: topics,
Command: command,
- Broker: broker,
+ //Broker: broker,
Payload: payload,
}
}
@@ -197,7 +196,7 @@ func makeMessage(command string, broker string, payload []byte, topics ...string
{
Topics: topics,
Command: command,
- Broker: broker,
+ //Broker: broker,
Payload: payload,
},
},
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml
index fa4116d0..6962eeb5 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml
@@ -23,19 +23,21 @@ redis:
addrs:
- "localhost:6379"
+
broadcast:
- default:
- driver: redis
- test:
- driver: memory
+ default:
+ driver: redis
+ addrs:
+ - "localhost:6379"
websockets:
- pubsubs: [ "redis" ]
+ broker: default
+ allowed_origin: "*"
path: "/ws"
logs:
mode: development
- level: error
+ level: debug
endure:
grace_period: 120s
diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
index dc073be3..b6882d84 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
@@ -23,12 +23,18 @@ redis:
addrs:
- "localhost:6379"
+broadcast:
+ default:
+ driver: redis
+ addrs:
+ - "localhost:6379"
+
websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
- # pubsubs might use general config section or its own
- pubsubs: [ "redis" ]
+ broker: default
+ allowed_origin: "*"
path: "/ws"
+
logs:
mode: development
level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml
index 896cee05..f81e13e3 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml
@@ -23,8 +23,13 @@ redis:
addrs:
- "localhost:6379"
+broadcast:
+ test:
+ driver: memory
+
websockets:
- pubsubs: [ "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml
index e3bf5218..decb7dcf 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml
@@ -19,12 +19,13 @@ http:
allocate_timeout: 60s
destroy_timeout: 60s
-redis:
- addrs:
- - "localhost:6379"
+broadcast:
+ test:
+ driver: memory
websockets:
- pubsubs: [ "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml
index 0614f4e7..5377aef2 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml
@@ -19,12 +19,13 @@ http:
allocate_timeout: 60s
destroy_timeout: 60s
-redis:
- addrs:
- - "localhost:6379"
+broadcast:
+ test:
+ driver: memory
websockets:
- pubsubs: [ "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml
index 27eab557..a077bf9e 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml
@@ -20,17 +20,18 @@ http:
destroy_timeout: 60s
-websockets:
- pubsubs: [ "redis", "memory" ]
- redis:
- addrs:
- - "localhost:6379"
+broadcast:
+ test:
+ driver: memory
+websockets:
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
mode: development
- level: error
+ level: debug
endure:
grace_period: 120s
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml
index fd125794..d80993f2 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml
@@ -21,7 +21,8 @@ http:
websockets:
- pubsubs: [ "redis", "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml
index eedf5377..3557f5f1 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml
@@ -23,10 +23,13 @@ redis:
addrs:
- "localhost:6379"
+broadcast:
+ test:
+ driver: redis
+
websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
- # pubsubs might use general config section
- pubsubs: [ "redis", "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 8321297d..cb78117f 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -17,6 +17,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -47,6 +48,7 @@ func TestBroadcastInit(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -98,6 +100,8 @@ func TestBroadcastInit(t *testing.T) {
time.Sleep(time.Second * 1)
t.Run("TestWSInit", wsInit)
+ t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
+ t.Run("RPCWsMemory", RPCWsMemory)
stopCh <- struct{}{}
@@ -119,7 +123,7 @@ func wsInit(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -156,6 +160,7 @@ func TestWSRedisAndMemory(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -219,7 +224,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) {
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "configs/.rr-websockets-redis-memory.yaml",
+ Path: "configs/.rr-websockets-redis.yaml",
Prefix: "rr",
}
@@ -232,6 +237,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -281,8 +287,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
- t.Run("RPCWsMemory", RPCWsMemory)
+
t.Run("RPCWsRedis", RPCWsRedis)
stopCh <- struct{}{}
@@ -308,6 +313,7 @@ func TestWSRedisNoSection(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -326,7 +332,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
HandshakeTimeout: time.Second * 20,
}
- connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"}
c, resp, err := da.Dial(connURL.String(), nil)
assert.NoError(t, err)
@@ -335,7 +341,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -359,7 +365,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -386,7 +392,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg)
+ assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP\"}", retMsg)
err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
assert.NoError(t, err)
@@ -398,7 +404,7 @@ func RPCWsMemory(t *testing.T) {
HandshakeTimeout: time.Second * 20,
}
- connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"}
c, resp, err := da.Dial(connURL.String(), nil)
assert.NoError(t, err)
@@ -409,7 +415,7 @@ func RPCWsMemory(t *testing.T) {
}
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -433,7 +439,7 @@ func RPCWsMemory(t *testing.T) {
assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -481,7 +487,7 @@ func RPCWsRedis(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -505,7 +511,7 @@ func RPCWsRedis(t *testing.T) {
assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -556,6 +562,7 @@ func TestWSMemoryDeny(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -631,7 +638,7 @@ func RPCWsMemoryDeny(t *testing.T) {
}
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -647,7 +654,7 @@ func RPCWsMemoryDeny(t *testing.T) {
assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -684,6 +691,7 @@ func TestWSMemoryStop(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -777,6 +785,7 @@ func TestWSMemoryOk(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -852,7 +861,7 @@ func RPCWsMemoryAllow(t *testing.T) {
}
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -876,7 +885,7 @@ func RPCWsMemoryAllow(t *testing.T) {
assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -909,7 +918,7 @@ func RPCWsMemoryAllow(t *testing.T) {
assert.NoError(t, err)
}
-func publish(command string, broker string, topics ...string) {
+func publish(command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -918,13 +927,13 @@ func publish(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret)
+ err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP"), topics...), ret)
if err != nil {
panic(err)
}
}
-func publishAsync(t *testing.T, command string, broker string, topics ...string) {
+func publishAsync(t *testing.T, command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -933,12 +942,12 @@ func publishAsync(t *testing.T, command string, broker string, topics ...string)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret)
+ err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
-func publishAsync2(t *testing.T, command string, broker string, topics ...string) {
+func publishAsync2(t *testing.T, command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -947,12 +956,12 @@ func publishAsync2(t *testing.T, command string, broker string, topics ...string
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret)
+ err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP2"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
-func publish2(t *testing.T, command string, broker string, topics ...string) {
+func publish2(t *testing.T, command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -961,27 +970,25 @@ func publish2(t *testing.T, command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret)
+ err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP2"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
-func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message {
+func messageWS(command string, payload []byte, topics ...string) *websocketsv1.Message {
return &websocketsv1.Message{
Topics: topics,
Command: command,
- Broker: broker,
Payload: payload,
}
}
-func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Request {
+func makeMessage(command string, payload []byte, topics ...string) *websocketsv1.Request {
m := &websocketsv1.Request{
Messages: []*websocketsv1.Message{
{
Topics: topics,
Command: command,
- Broker: broker,
Payload: payload,
},
},