summaryrefslogtreecommitdiff
path: root/tests/plugins/websockets
diff options
context:
space:
mode:
Diffstat (limited to 'tests/plugins/websockets')
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-allow.yaml52
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-allow2.yaml54
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-broker-no-section.yaml48
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-deny.yaml48
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-deny2.yaml50
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-init.yaml48
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-redis.yaml51
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-stop.yaml48
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go918
9 files changed, 0 insertions, 1317 deletions
diff --git a/tests/plugins/websockets/configs/.rr-websockets-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-allow.yaml
deleted file mode 100644
index 3d0268d4..00000000
--- a/tests/plugins/websockets/configs/.rr-websockets-allow.yaml
+++ /dev/null
@@ -1,52 +0,0 @@
-rpc:
- listen: tcp://127.0.0.1:6001
-
-server:
- command: "php ../../worker-ok.php"
- user: ""
- group: ""
- relay: "pipes"
- relay_timeout: "20s"
-
-http:
- address: 127.0.0.1:41278
- max_request_size: 1024
- middleware: ["websockets"]
- trusted_subnets:
- [
- "10.0.0.0/8",
- "127.0.0.0/8",
- "172.16.0.0/12",
- "192.168.0.0/16",
- "::1/128",
- "fc00::/7",
- "fe80::/10",
- ]
- pool:
- num_workers: 2
- max_jobs: 0
- allocate_timeout: 60s
- destroy_timeout: 60s
-
-redis:
- addrs:
- - "127.0.0.1:6379"
-
-broadcast:
- test:
- driver: memory
- config: {}
-
-websockets:
- broker: test
- allowed_origin: "*"
- path: "/ws"
-
-logs:
- mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml b/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml
deleted file mode 100644
index f8e36136..00000000
--- a/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml
+++ /dev/null
@@ -1,54 +0,0 @@
-rpc:
- listen: tcp://127.0.0.1:6001
-
-server:
- command: "php ../../worker-ok.php"
- user: ""
- group: ""
- relay: "pipes"
- relay_timeout: "20s"
-
-http:
- address: 127.0.0.1:41270
- max_request_size: 1024
- middleware: ["websockets"]
- trusted_subnets:
- [
- "10.0.0.0/8",
- "127.0.0.0/8",
- "172.16.0.0/12",
- "192.168.0.0/16",
- "::1/128",
- "fc00::/7",
- "fe80::/10",
- ]
- pool:
- num_workers: 2
- max_jobs: 0
- allocate_timeout: 60s
- destroy_timeout: 60s
-
-test:
- addrs:
- - "127.0.0.1:6379"
-
-broadcast:
- test:
- driver: redis
- config:
- addrs:
- - "127.0.0.1:6379"
-
-websockets:
- broker: test
- allowed_origin: "*"
- path: "/ws"
-
-logs:
- mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-broker-no-section.yaml b/tests/plugins/websockets/configs/.rr-websockets-broker-no-section.yaml
deleted file mode 100644
index c72e1f15..00000000
--- a/tests/plugins/websockets/configs/.rr-websockets-broker-no-section.yaml
+++ /dev/null
@@ -1,48 +0,0 @@
-rpc:
- listen: tcp://127.0.0.1:6001
-
-server:
- command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
- relay: "pipes"
- relay_timeout: "20s"
-
-http:
- address: 127.0.0.1:13235
- max_request_size: 1024
- middleware: ["websockets"]
- trusted_subnets:
- [
- "10.0.0.0/8",
- "127.0.0.0/8",
- "172.16.0.0/12",
- "192.168.0.0/16",
- "::1/128",
- "fc00::/7",
- "fe80::/10",
- ]
- pool:
- num_workers: 2
- max_jobs: 0
- allocate_timeout: 60s
- destroy_timeout: 60s
-
-broadcast:
- test1:
- driver: no
- config:
-
-websockets:
- broker: test
- allowed_origin: "*"
- path: "/ws"
-
-logs:
- mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-deny.yaml b/tests/plugins/websockets/configs/.rr-websockets-deny.yaml
deleted file mode 100644
index 61265c4b..00000000
--- a/tests/plugins/websockets/configs/.rr-websockets-deny.yaml
+++ /dev/null
@@ -1,48 +0,0 @@
-rpc:
- listen: tcp://127.0.0.1:6001
-
-server:
- command: "php ../../worker-deny.php"
- user: ""
- group: ""
- relay: "pipes"
- relay_timeout: "20s"
-
-http:
- address: 127.0.0.1:15587
- max_request_size: 1024
- middleware: ["websockets"]
- trusted_subnets:
- [
- "10.0.0.0/8",
- "127.0.0.0/8",
- "172.16.0.0/12",
- "192.168.0.0/16",
- "::1/128",
- "fc00::/7",
- "fe80::/10",
- ]
- pool:
- num_workers: 2
- max_jobs: 0
- allocate_timeout: 60s
- destroy_timeout: 60s
-
-broadcast:
- test:
- driver: memory
- config: {}
-
-websockets:
- broker: test
- allowed_origin: "*"
- path: "/ws"
-
-logs:
- mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml b/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml
deleted file mode 100644
index b99a3571..00000000
--- a/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml
+++ /dev/null
@@ -1,50 +0,0 @@
-rpc:
- listen: tcp://127.0.0.1:6001
-
-server:
- command: "php ../../worker-deny.php"
- user: ""
- group: ""
- relay: "pipes"
- relay_timeout: "20s"
-
-http:
- address: 127.0.0.1:15588
- max_request_size: 1024
- middleware: ["websockets"]
- trusted_subnets:
- [
- "10.0.0.0/8",
- "127.0.0.0/8",
- "172.16.0.0/12",
- "192.168.0.0/16",
- "::1/128",
- "fc00::/7",
- "fe80::/10",
- ]
- pool:
- num_workers: 2
- max_jobs: 0
- allocate_timeout: 60s
- destroy_timeout: 60s
-
-broadcast:
- test:
- driver: redis
- config:
- addrs:
- - "127.0.0.1:6379"
-
-websockets:
- broker: test
- allowed_origin: "*"
- path: "/ws"
-
-logs:
- mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
deleted file mode 100644
index 3120f146..00000000
--- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml
+++ /dev/null
@@ -1,48 +0,0 @@
-rpc:
- listen: tcp://127.0.0.1:6001
-
-server:
- command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
- relay: "pipes"
- relay_timeout: "20s"
-
-http:
- address: 127.0.0.1:11111
- max_request_size: 1024
- middleware: ["websockets"]
- trusted_subnets:
- [
- "10.0.0.0/8",
- "127.0.0.0/8",
- "172.16.0.0/12",
- "192.168.0.0/16",
- "::1/128",
- "fc00::/7",
- "fe80::/10",
- ]
- pool:
- num_workers: 2
- max_jobs: 0
- allocate_timeout: 60s
- destroy_timeout: 60s
-
-broadcast:
- default:
- driver: memory
- config: {}
-
-websockets:
- broker: default
- allowed_origin: "*"
- path: "/ws"
-
-logs:
- mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml
deleted file mode 100644
index fc01e0b1..00000000
--- a/tests/plugins/websockets/configs/.rr-websockets-redis.yaml
+++ /dev/null
@@ -1,51 +0,0 @@
-rpc:
- listen: tcp://127.0.0.1:6001
-
-server:
- command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
- relay: "pipes"
- relay_timeout: "20s"
-
-http:
- address: 127.0.0.1:13235
- max_request_size: 1024
- middleware: ["websockets"]
- trusted_subnets:
- [
- "10.0.0.0/8",
- "127.0.0.0/8",
- "172.16.0.0/12",
- "192.168.0.0/16",
- "::1/128",
- "fc00::/7",
- "fe80::/10",
- ]
- pool:
- num_workers: 2
- max_jobs: 0
- allocate_timeout: 60s
- destroy_timeout: 60s
-
-test:
- addrs:
- - "127.0.0.1:6379"
-
-broadcast:
- test:
- driver: redis
-
-websockets:
- broker: test
- allowed_origin: "*"
- path: "/ws"
-
-logs:
- mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-stop.yaml b/tests/plugins/websockets/configs/.rr-websockets-stop.yaml
deleted file mode 100644
index 35529e9e..00000000
--- a/tests/plugins/websockets/configs/.rr-websockets-stop.yaml
+++ /dev/null
@@ -1,48 +0,0 @@
-rpc:
- listen: tcp://127.0.0.1:6001
-
-server:
- command: "php ../../worker-stop.php"
- user: ""
- group: ""
- relay: "pipes"
- relay_timeout: "20s"
-
-http:
- address: 127.0.0.1:11114
- max_request_size: 1024
- middleware: ["websockets"]
- trusted_subnets:
- [
- "10.0.0.0/8",
- "127.0.0.0/8",
- "172.16.0.0/12",
- "192.168.0.0/16",
- "::1/128",
- "fc00::/7",
- "fe80::/10",
- ]
- pool:
- num_workers: 2
- max_jobs: 0
- allocate_timeout: 60s
- destroy_timeout: 60s
-
-broadcast:
- test:
- driver: memory
- config: {}
-
-websockets:
- broker: test
- allowed_origin: "*"
- path: "/ws"
-
-logs:
- mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
deleted file mode 100644
index 3e74ca59..00000000
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ /dev/null
@@ -1,918 +0,0 @@
-package websockets
-
-import (
- "net"
- "net/http"
- "net/rpc"
- "net/url"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "testing"
- "time"
-
- "github.com/fasthttp/websocket"
- json "github.com/json-iterator/go"
- endure "github.com/spiral/endure/pkg/container"
- goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
- "github.com/spiral/roadrunner/v2/plugins/config"
- httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/memory"
- "github.com/spiral/roadrunner/v2/plugins/redis"
- rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/websockets"
- websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/utils"
- "github.com/stretchr/testify/assert"
-)
-
-func TestWebsocketsInit(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- assert.NoError(t, err)
-
- cfg := &config.Viper{
- Path: "configs/.rr-websockets-init.yaml",
- Prefix: "rr",
- }
-
- err = cont.RegisterAll(
- cfg,
- &rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- &server.Plugin{},
- &redis.Plugin{},
- &websockets.Plugin{},
- &httpPlugin.Plugin{},
- &memory.Plugin{},
- &broadcast.Plugin{},
- )
-
- assert.NoError(t, err)
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- if err != nil {
- t.Fatal(err)
- }
-
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- stopCh := make(chan struct{}, 1)
-
- go func() {
- defer wg.Done()
- for {
- select {
- case e := <-ch:
- assert.Fail(t, "error", e.Error.Error())
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-stopCh:
- // timeout
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- }
- }
- }()
-
- time.Sleep(time.Second * 1)
- t.Run("TestWSInit", wsInit)
- t.Run("RPCWsMemoryPubAsync", RPCWsPubAsync("11111"))
- t.Run("RPCWsMemory", RPCWsPub("11111"))
-
- stopCh <- struct{}{}
-
- wg.Wait()
-}
-
-func TestWSRedis(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- assert.NoError(t, err)
-
- cfg := &config.Viper{
- Path: "configs/.rr-websockets-redis.yaml",
- Prefix: "rr",
- }
-
- err = cont.RegisterAll(
- cfg,
- &rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- &server.Plugin{},
- &redis.Plugin{},
- &websockets.Plugin{},
- &httpPlugin.Plugin{},
- &broadcast.Plugin{},
- )
- assert.NoError(t, err)
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- if err != nil {
- t.Fatal(err)
- }
-
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- stopCh := make(chan struct{}, 1)
-
- go func() {
- defer wg.Done()
- for {
- select {
- case e := <-ch:
- assert.Fail(t, "error", e.Error.Error())
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-stopCh:
- // timeout
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- }
- }
- }()
-
- time.Sleep(time.Second * 1)
- t.Run("RPCWsRedisPubAsync", RPCWsPubAsync("13235"))
- t.Run("RPCWsRedisPub", RPCWsPub("13235"))
-
- stopCh <- struct{}{}
-
- wg.Wait()
-}
-
-func TestWSRedisNoSection(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- assert.NoError(t, err)
-
- cfg := &config.Viper{
- Path: "configs/.rr-websockets-broker-no-section.yaml",
- Prefix: "rr",
- }
-
- err = cont.RegisterAll(
- cfg,
- &rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- &server.Plugin{},
- &redis.Plugin{},
- &websockets.Plugin{},
- &httpPlugin.Plugin{},
- &broadcast.Plugin{},
- )
- assert.NoError(t, err)
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = cont.Serve()
- assert.Error(t, err)
-}
-
-func TestWSDeny(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- assert.NoError(t, err)
-
- cfg := &config.Viper{
- Path: "configs/.rr-websockets-deny.yaml",
- Prefix: "rr",
- }
-
- err = cont.RegisterAll(
- cfg,
- &rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- &server.Plugin{},
- &websockets.Plugin{},
- &httpPlugin.Plugin{},
- &memory.Plugin{},
- &broadcast.Plugin{},
- )
- assert.NoError(t, err)
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- if err != nil {
- t.Fatal(err)
- }
-
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- stopCh := make(chan struct{}, 1)
-
- go func() {
- defer wg.Done()
- for {
- select {
- case e := <-ch:
- assert.Fail(t, "error", e.Error.Error())
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-stopCh:
- // timeout
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- }
- }
- }()
-
- time.Sleep(time.Second * 1)
- t.Run("RPCWsMemoryDeny", RPCWsDeny("15587"))
-
- stopCh <- struct{}{}
-
- wg.Wait()
-}
-
-func TestWSDeny2(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- assert.NoError(t, err)
-
- cfg := &config.Viper{
- Path: "configs/.rr-websockets-deny2.yaml",
- Prefix: "rr",
- }
-
- err = cont.RegisterAll(
- cfg,
- &rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- &server.Plugin{},
- &websockets.Plugin{},
- &httpPlugin.Plugin{},
- &redis.Plugin{},
- &broadcast.Plugin{},
- )
- assert.NoError(t, err)
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- if err != nil {
- t.Fatal(err)
- }
-
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- stopCh := make(chan struct{}, 1)
-
- go func() {
- defer wg.Done()
- for {
- select {
- case e := <-ch:
- assert.Fail(t, "error", e.Error.Error())
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-stopCh:
- // timeout
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- }
- }
- }()
-
- time.Sleep(time.Second * 1)
- t.Run("RPCWsRedisDeny", RPCWsDeny("15588"))
-
- stopCh <- struct{}{}
-
- wg.Wait()
-}
-
-func TestWSStop(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- assert.NoError(t, err)
-
- cfg := &config.Viper{
- Path: "configs/.rr-websockets-stop.yaml",
- Prefix: "rr",
- }
-
- err = cont.RegisterAll(
- cfg,
- &rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- &server.Plugin{},
- &redis.Plugin{},
- &websockets.Plugin{},
- &httpPlugin.Plugin{},
- &memory.Plugin{},
- &broadcast.Plugin{},
- )
- assert.NoError(t, err)
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- if err != nil {
- t.Fatal(err)
- }
-
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- stopCh := make(chan struct{}, 1)
-
- go func() {
- defer wg.Done()
- for {
- select {
- case e := <-ch:
- assert.Fail(t, "error", e.Error.Error())
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-stopCh:
- // timeout
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- }
- }
- }()
-
- time.Sleep(time.Second * 1)
- t.Run("RPCWsStop", RPCWsMemoryStop("11114"))
-
- stopCh <- struct{}{}
-
- wg.Wait()
-}
-
-func RPCWsMemoryStop(port string) func(t *testing.T) {
- return func(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "127.0.0.1:" + port, Path: "/ws"}
-
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NotNil(t, resp)
- assert.Error(t, err)
- assert.Nil(t, c)
- assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) //nolint:staticcheck
- assert.Equal(t, resp.Header.Get("Stop"), "we-dont-like-you") //nolint:staticcheck
- if resp != nil && resp.Body != nil { //nolint:staticcheck
- _ = resp.Body.Close()
- }
- }
-}
-
-func TestWSAllow(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- assert.NoError(t, err)
-
- cfg := &config.Viper{
- Path: "configs/.rr-websockets-allow.yaml",
- Prefix: "rr",
- }
-
- err = cont.RegisterAll(
- cfg,
- &rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- &server.Plugin{},
- &redis.Plugin{},
- &websockets.Plugin{},
- &httpPlugin.Plugin{},
- &memory.Plugin{},
- &broadcast.Plugin{},
- )
- assert.NoError(t, err)
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- if err != nil {
- t.Fatal(err)
- }
-
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- stopCh := make(chan struct{}, 1)
-
- go func() {
- defer wg.Done()
- for {
- select {
- case e := <-ch:
- assert.Fail(t, "error", e.Error.Error())
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-stopCh:
- // timeout
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- }
- }
- }()
-
- time.Sleep(time.Second * 1)
- t.Run("RPCWsMemoryAllow", RPCWsPub("41278"))
-
- stopCh <- struct{}{}
-
- wg.Wait()
-}
-
-func TestWSAllow2(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- assert.NoError(t, err)
-
- cfg := &config.Viper{
- Path: "configs/.rr-websockets-allow2.yaml",
- Prefix: "rr",
- }
-
- err = cont.RegisterAll(
- cfg,
- &rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- &server.Plugin{},
- &redis.Plugin{},
- &websockets.Plugin{},
- &httpPlugin.Plugin{},
- &memory.Plugin{},
- &broadcast.Plugin{},
- )
- assert.NoError(t, err)
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- if err != nil {
- t.Fatal(err)
- }
-
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- stopCh := make(chan struct{}, 1)
-
- go func() {
- defer wg.Done()
- for {
- select {
- case e := <-ch:
- assert.Fail(t, "error", e.Error.Error())
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-stopCh:
- // timeout
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- }
- }
- }()
-
- time.Sleep(time.Second * 1)
- t.Run("RPCWsMemoryAllow", RPCWsPub("41270"))
-
- stopCh <- struct{}{}
-
- wg.Wait()
-}
-
-func wsInit(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "127.0.0.1:11111", Path: "/ws"}
-
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NoError(t, err)
-
- defer func() {
- _ = resp.Body.Close()
- }()
-
- d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
-
- err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
- assert.NoError(t, err)
-}
-
-func RPCWsPubAsync(port string) func(t *testing.T) {
- return func(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "127.0.0.1:" + port, Path: "/ws"}
-
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NoError(t, err)
-
- defer func() {
- if resp != nil && resp.Body != nil {
- _ = resp.Body.Close()
- }
- }()
-
- go func() {
- messagesToVerify := make([]string, 0, 4)
- messagesToVerify = append(messagesToVerify, `{"topic":"@join","payload":["foo","foo2"]}`)
- messagesToVerify = append(messagesToVerify, `{"topic":"foo","payload":"hello, PHP"}`)
- messagesToVerify = append(messagesToVerify, `{"topic":"@leave","payload":["foo"]}`)
- messagesToVerify = append(messagesToVerify, `{"topic":"foo2","payload":"hello, PHP2"}`)
- i := 0
- for {
- _, msg, err2 := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err2)
- assert.Equal(t, messagesToVerify[i], retMsg)
- i++
- if i == 3 {
- return
- }
- }
- }()
-
- time.Sleep(time.Second)
-
- d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- publishAsync(t, "foo")
-
- time.Sleep(time.Second)
-
- // //// LEAVE foo /////////
- d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
- publishAsync(t, "foo")
-
- go func() {
- time.Sleep(time.Second * 5)
- publishAsync(t, "foo2")
- }()
-
- err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
- assert.NoError(t, err)
- }
-}
-
-func RPCWsPub(port string) func(t *testing.T) {
- return func(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "127.0.0.1:" + port, Path: "/ws"}
-
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NoError(t, err)
-
- defer func() {
- if resp != nil && resp.Body != nil {
- _ = resp.Body.Close()
- }
- }()
-
- go func() {
- messagesToVerify := make([]string, 0, 10)
- messagesToVerify = append(messagesToVerify, `{"topic":"@join","payload":["foo","foo2"]}`)
- messagesToVerify = append(messagesToVerify, `{"topic":"foo","payload":"hello, PHP"}`)
- messagesToVerify = append(messagesToVerify, `{"topic":"@leave","payload":["foo"]}`)
- messagesToVerify = append(messagesToVerify, `{"topic":"foo2","payload":"hello, PHP2"}`)
- i := 0
- for {
- _, msg, err2 := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err2)
- assert.Equal(t, messagesToVerify[i], retMsg)
- i++
- if i == 3 {
- return
- }
- }
- }()
-
- time.Sleep(time.Second)
-
- d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- publish("", "foo")
-
- time.Sleep(time.Second)
-
- // //// LEAVE foo /////////
- d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- time.Sleep(time.Second)
-
- // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
- publish("", "foo")
-
- go func() {
- time.Sleep(time.Second * 5)
- publish2(t, "", "foo2")
- }()
-
- err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
- assert.NoError(t, err)
- }
-}
-
-func RPCWsDeny(port string) func(t *testing.T) {
- return func(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "127.0.0.1:" + port, Path: "/ws"}
-
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NoError(t, err)
- assert.NotNil(t, c)
- assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
-
- defer func() {
- if resp != nil && resp.Body != nil {
- _ = resp.Body.Close()
- }
- }()
-
- d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg)
-
- // //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
-
- err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
- assert.NoError(t, err)
- }
-}
-
-// ---------------------------------------------------------------------------------------------------
-
-func publish(topics ...string) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- if err != nil {
- panic(err)
- }
-
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- ret := &websocketsv1.Response{}
- err = client.Call("broadcast.Publish", makeMessage([]byte("hello, PHP"), topics...), ret)
- if err != nil {
- panic(err)
- }
-}
-
-func publishAsync(t *testing.T, topics ...string) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- if err != nil {
- panic(err)
- }
-
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- ret := &websocketsv1.Response{}
- err = client.Call("broadcast.PublishAsync", makeMessage([]byte("hello, PHP"), topics...), ret)
- assert.NoError(t, err)
- assert.True(t, ret.Ok)
-}
-
-func publish2(t *testing.T, topics ...string) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- if err != nil {
- panic(err)
- }
-
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
-
- ret := &websocketsv1.Response{}
- err = client.Call("broadcast.Publish", makeMessage([]byte("hello, PHP2"), topics...), ret)
- assert.NoError(t, err)
- assert.True(t, ret.Ok)
-}
-
-func messageWS(command string, payload []byte, topics ...string) *websocketsv1.Message {
- return &websocketsv1.Message{
- Topics: topics,
- Command: command,
- Payload: payload,
- }
-}
-
-func makeMessage(payload []byte, topics ...string) *websocketsv1.Request {
- m := &websocketsv1.Request{
- Messages: []*websocketsv1.Message{
- {
- Topics: topics,
- Payload: payload,
- },
- },
- }
-
- return m
-}