diff options
author | Valery Piashchynski <[email protected]> | 2021-06-21 08:49:47 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-21 08:49:47 +0300 |
commit | 591b69be9279158cfe9e7083152d308c4ee480b7 (patch) | |
tree | 5c6b1d4ee2aea4e6a1cc828ca1fcb2306ef9741e /tests/plugins | |
parent | 25e0841c6aa5e2686da5b9f74e3d77d3814ff592 (diff) | |
parent | 2ab22ac9e935efb126b51e9c3521073e6a5155a1 (diff) |
#732 feat(plugin): rework `broadcast` plugin
#732 feat(plugin): rework `broadcast` plugin
Diffstat (limited to 'tests/plugins')
22 files changed, 1455 insertions, 420 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go new file mode 100644 index 00000000..2cd4b451 --- /dev/null +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -0,0 +1,474 @@ +package broadcast + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/config" + httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory" + "github.com/spiral/roadrunner/v2/plugins/redis" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/websockets" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/spiral/roadrunner/v2/tests/plugins/broadcast/plugins" + "github.com/stretchr/testify/assert" +) + +func TestBroadcastInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-broadcast-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &broadcast.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + stopCh <- struct{}{} + + wg.Wait() +} + +func TestBroadcastConfigError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-broadcast-config-error.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &broadcast.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + + &plugins.Plugin1{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + assert.Error(t, err) +} + +func TestBroadcastNoConfig(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-broadcast-no-config.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", []string{}).MinTimes(1) + + err = cont.RegisterAll( + cfg, + &broadcast.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + // should be just disabled + _, err = cont.Serve() + assert.NoError(t, err) +} + +func TestBroadcastSameSubscriber(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-broadcast-same-section.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).MinTimes(1) + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin4: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin5: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin6: {foo hello}`).Times(3) + + err = cont.RegisterAll( + cfg, + &broadcast.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + + // test - redis + // test2 - redis (port 6378) + // test3 - memory + // test4 - memory + &plugins.Plugin1{}, // foo, foo2, foo3 test + &plugins.Plugin2{}, // foo, test + &plugins.Plugin3{}, // foo, test2 + &plugins.Plugin4{}, // foo, test3 + &plugins.Plugin5{}, // foo, test4 + &plugins.Plugin6{}, // foo, test3 + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 2) + + t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002")) + t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002")) + t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) + t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) + + time.Sleep(time.Second * 4) + stopCh <- struct{}{} + + wg.Wait() +} + +func TestBroadcastSameSubscriberGlobal(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-broadcast-global.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).MinTimes(1) + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin4: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin5: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin6: {foo hello}`).Times(3) + + err = cont.RegisterAll( + cfg, + &broadcast.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + + // test - redis + // test2 - redis (port 6378) + // test3 - memory + // test4 - memory + &plugins.Plugin1{}, // foo, foo2, foo3 test + &plugins.Plugin2{}, // foo, test + &plugins.Plugin3{}, // foo, test2 + &plugins.Plugin4{}, // foo, test3 + &plugins.Plugin5{}, // foo, test4 + &plugins.Plugin6{}, // foo, test3 + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 2) + + t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003")) + t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003")) + t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) + t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) + + time.Sleep(time.Second * 4) + stopCh <- struct{}{} + + wg.Wait() +} + +func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:"+port) + if err != nil { + t.Fatal(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + ret := &websocketsv1.Response{} + err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo", "foo2", "foo3"), ret) + if err != nil { + t.Fatal(err) + } + } +} + +func BroadcastPublishFoo2(port string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:"+port) + if err != nil { + t.Fatal(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + ret := &websocketsv1.Response{} + err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo"), ret) + if err != nil { + t.Fatal(err) + } + } +} + +func BroadcastPublishFoo3(port string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:"+port) + if err != nil { + t.Fatal(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + ret := &websocketsv1.Response{} + err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo3"), ret) + if err != nil { + t.Fatal(err) + } + } +} +func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:"+port) + if err != nil { + t.Fatal(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + ret := &websocketsv1.Response{} + err = client.Call("broadcast.PublishAsync", makeMessage([]byte("hello"), "foo", "foo2", "foo3"), ret) + if err != nil { + t.Fatal(err) + } + } +} + +func makeMessage(payload []byte, topics ...string) *websocketsv1.Request { + m := &websocketsv1.Request{ + Messages: []*websocketsv1.Message{ + { + Topics: topics, + Payload: payload, + }, + }, + } + + return m +} diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml new file mode 100644 index 00000000..d8daa251 --- /dev/null +++ b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml @@ -0,0 +1,33 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:21345 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +broadcast: + default: + driver: redis + +logs: + mode: development + level: debug + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml index eedf5377..2ca97055 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml @@ -1,5 +1,5 @@ rpc: - listen: tcp://127.0.0.1:6001 + listen: tcp://127.0.0.1:6003 server: command: "php ../../psr-worker-bench.php" @@ -9,7 +9,7 @@ server: relay_timeout: "20s" http: - address: 127.0.0.1:13235 + address: 127.0.0.1:21543 max_request_size: 1024 middleware: [ "websockets" ] trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] @@ -23,11 +23,18 @@ redis: addrs: - "localhost:6379" -websockets: - # pubsubs should implement PubSub interface to be collected via endure.Collects - # pubsubs might use general config section - pubsubs: [ "redis", "memory" ] - path: "/ws" +broadcast: + test: + driver: redis + test2: + driver: redis + addrs: + - "localhost:6378" + test3: + driver: memory + test4: + driver: memory + logs: mode: development diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml new file mode 100644 index 00000000..aa80330e --- /dev/null +++ b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml @@ -0,0 +1,35 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:21345 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +broadcast: + default: + driver: redis + addrs: + - "localhost:6379" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml new file mode 100644 index 00000000..90790869 --- /dev/null +++ b/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml @@ -0,0 +1,29 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:21345 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +logs: + mode: development + level: debug + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml new file mode 100644 index 00000000..360e05e5 --- /dev/null +++ b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml @@ -0,0 +1,43 @@ +rpc: + listen: tcp://127.0.0.1:6002 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:21345 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +broadcast: + test: + driver: redis + addrs: + - "localhost:6379" + test2: + driver: redis + addrs: + - "localhost:6378" + test3: + driver: memory + test4: + driver: memory + +logs: + mode: development + level: debug + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go new file mode 100644 index 00000000..d3b16256 --- /dev/null +++ b/tests/plugins/broadcast/plugins/plugin1.go @@ -0,0 +1,67 @@ +package plugins + +import ( + "fmt" + + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const Plugin1Name = "plugin1" + +type Plugin1 struct { + log logger.Logger + b broadcast.Broadcaster + driver pubsub.SubReader +} + +func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error { + p.log = log + p.b = b + return nil +} + +func (p *Plugin1) Serve() chan error { + errCh := make(chan error, 1) + + var err error + p.driver, err = p.b.GetDriver("test") + if err != nil { + errCh <- err + return errCh + } + + err = p.driver.Subscribe("1", "foo", "foo2", "foo3") + if err != nil { + panic(err) + } + + go func() { + for { + msg, err := p.driver.Next() + if err != nil { + panic(err) + } + + if msg == nil { + continue + } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + } + }() + + return errCh +} + +func (p *Plugin1) Stop() error { + _ = p.driver.Unsubscribe("1", "foo") + _ = p.driver.Unsubscribe("1", "foo2") + _ = p.driver.Unsubscribe("1", "foo3") + return nil +} + +func (p *Plugin1) Name() string { + return Plugin1Name +} diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go new file mode 100644 index 00000000..2bd819d2 --- /dev/null +++ b/tests/plugins/broadcast/plugins/plugin2.go @@ -0,0 +1,64 @@ +package plugins + +import ( + "fmt" + + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const Plugin2Name = "plugin2" + +type Plugin2 struct { + log logger.Logger + b broadcast.Broadcaster + driver pubsub.SubReader +} + +func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error { + p.log = log + p.b = b + return nil +} + +func (p *Plugin2) Serve() chan error { + errCh := make(chan error, 1) + + var err error + p.driver, err = p.b.GetDriver("test") + if err != nil { + panic(err) + } + + err = p.driver.Subscribe("2", "foo") + if err != nil { + panic(err) + } + + go func() { + for { + msg, err := p.driver.Next() + if err != nil { + panic(err) + } + + if msg == nil { + continue + } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + } + }() + + return errCh +} + +func (p *Plugin2) Stop() error { + _ = p.driver.Unsubscribe("2", "foo") + return nil +} + +func (p *Plugin2) Name() string { + return Plugin2Name +} diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go new file mode 100644 index 00000000..ef926222 --- /dev/null +++ b/tests/plugins/broadcast/plugins/plugin3.go @@ -0,0 +1,64 @@ +package plugins + +import ( + "fmt" + + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const Plugin3Name = "plugin3" + +type Plugin3 struct { + log logger.Logger + b broadcast.Broadcaster + driver pubsub.SubReader +} + +func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error { + p.log = log + p.b = b + return nil +} + +func (p *Plugin3) Serve() chan error { + errCh := make(chan error, 1) + + var err error + p.driver, err = p.b.GetDriver("test2") + if err != nil { + panic(err) + } + + err = p.driver.Subscribe("3", "foo") + if err != nil { + panic(err) + } + + go func() { + for { + msg, err := p.driver.Next() + if err != nil { + panic(err) + } + + if msg == nil { + continue + } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + } + }() + + return errCh +} + +func (p *Plugin3) Stop() error { + _ = p.driver.Unsubscribe("3", "foo") + return nil +} + +func (p *Plugin3) Name() string { + return Plugin3Name +} diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go new file mode 100644 index 00000000..c9b94777 --- /dev/null +++ b/tests/plugins/broadcast/plugins/plugin4.go @@ -0,0 +1,64 @@ +package plugins + +import ( + "fmt" + + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const Plugin4Name = "plugin4" + +type Plugin4 struct { + log logger.Logger + b broadcast.Broadcaster + driver pubsub.SubReader +} + +func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error { + p.log = log + p.b = b + return nil +} + +func (p *Plugin4) Serve() chan error { + errCh := make(chan error, 1) + + var err error + p.driver, err = p.b.GetDriver("test3") + if err != nil { + panic(err) + } + + err = p.driver.Subscribe("4", "foo") + if err != nil { + panic(err) + } + + go func() { + for { + msg, err := p.driver.Next() + if err != nil { + panic(err) + } + + if msg == nil { + continue + } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + } + }() + + return errCh +} + +func (p *Plugin4) Stop() error { + _ = p.driver.Unsubscribe("4", "foo") + return nil +} + +func (p *Plugin4) Name() string { + return Plugin4Name +} diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go new file mode 100644 index 00000000..01562a8f --- /dev/null +++ b/tests/plugins/broadcast/plugins/plugin5.go @@ -0,0 +1,64 @@ +package plugins + +import ( + "fmt" + + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const Plugin5Name = "plugin5" + +type Plugin5 struct { + log logger.Logger + b broadcast.Broadcaster + driver pubsub.SubReader +} + +func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error { + p.log = log + p.b = b + return nil +} + +func (p *Plugin5) Serve() chan error { + errCh := make(chan error, 1) + + var err error + p.driver, err = p.b.GetDriver("test4") + if err != nil { + panic(err) + } + + err = p.driver.Subscribe("5", "foo") + if err != nil { + panic(err) + } + + go func() { + for { + msg, err := p.driver.Next() + if err != nil { + panic(err) + } + + if msg == nil { + continue + } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + } + }() + + return errCh +} + +func (p *Plugin5) Stop() error { + _ = p.driver.Unsubscribe("5", "foo") + return nil +} + +func (p *Plugin5) Name() string { + return Plugin5Name +} diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go new file mode 100644 index 00000000..76f2d6e8 --- /dev/null +++ b/tests/plugins/broadcast/plugins/plugin6.go @@ -0,0 +1,64 @@ +package plugins + +import ( + "fmt" + + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const Plugin6Name = "plugin6" + +type Plugin6 struct { + log logger.Logger + b broadcast.Broadcaster + driver pubsub.SubReader +} + +func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error { + p.log = log + p.b = b + return nil +} + +func (p *Plugin6) Serve() chan error { + errCh := make(chan error, 1) + + var err error + p.driver, err = p.b.GetDriver("test") + if err != nil { + panic(err) + } + + err = p.driver.Subscribe("6", "foo") + if err != nil { + panic(err) + } + + go func() { + for { + msg, err := p.driver.Next() + if err != nil { + panic(err) + } + + if msg == nil { + continue + } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + } + }() + + return errCh +} + +func (p *Plugin6) Stop() error { + _ = p.driver.Unsubscribe("6", "foo") + return nil +} + +func (p *Plugin6) Name() string { + return Plugin6Name +} diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index 24b66ae1..1e466e06 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -12,7 +12,6 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - payload "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb" @@ -21,6 +20,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + payload "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/stretchr/testify/assert" ) diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-allow.yaml index 896cee05..e6c43857 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-allow.yaml @@ -9,7 +9,7 @@ server: relay_timeout: "20s" http: - address: 127.0.0.1:11113 + address: 127.0.0.1:41278 max_request_size: 1024 middleware: [ "websockets" ] trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] @@ -23,8 +23,13 @@ redis: addrs: - "localhost:6379" +broadcast: + test: + driver: memory + websockets: - pubsubs: [ "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml b/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml new file mode 100644 index 00000000..d537a80b --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml @@ -0,0 +1,44 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../worker-ok.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:41270 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +broadcast: + test: + driver: redis + addrs: + - "localhost:6379" + +websockets: + broker: test + allowed_origin: "*" + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml b/tests/plugins/websockets/configs/.rr-websockets-broker-no-section.yaml index fd125794..ada23845 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-broker-no-section.yaml @@ -19,9 +19,13 @@ http: allocate_timeout: 60s destroy_timeout: 60s +broadcast: + test1: + driver: no websockets: - pubsubs: [ "redis", "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml b/tests/plugins/websockets/configs/.rr-websockets-deny.yaml index e3bf5218..594a746d 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-deny.yaml @@ -9,7 +9,7 @@ server: relay_timeout: "20s" http: - address: 127.0.0.1:11112 + address: 127.0.0.1:15587 max_request_size: 1024 middleware: [ "websockets" ] trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] @@ -19,12 +19,13 @@ http: allocate_timeout: 60s destroy_timeout: 60s -redis: - addrs: - - "localhost:6379" +broadcast: + test: + driver: memory websockets: - pubsubs: [ "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml b/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml new file mode 100644 index 00000000..4deea30a --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml @@ -0,0 +1,40 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../worker-deny.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:15588 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +broadcast: + test: + driver: redis + addrs: + - "localhost:6379" + +websockets: + broker: test + allowed_origin: "*" + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml index dc073be3..14472f8a 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml @@ -23,12 +23,16 @@ redis: addrs: - "localhost:6379" +broadcast: + default: + driver: memory + websockets: - # pubsubs should implement PubSub interface to be collected via endure.Collects - # pubsubs might use general config section or its own - pubsubs: [ "redis" ] + broker: default + allowed_origin: "*" path: "/ws" + logs: mode: development level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml index 27eab557..3557f5f1 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml @@ -19,13 +19,17 @@ http: allocate_timeout: 60s destroy_timeout: 60s - -websockets: - pubsubs: [ "redis", "memory" ] - redis: +redis: addrs: - "localhost:6379" +broadcast: + test: + driver: redis + +websockets: + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml b/tests/plugins/websockets/configs/.rr-websockets-stop.yaml index 0614f4e7..5377aef2 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-stop.yaml @@ -19,12 +19,13 @@ http: allocate_timeout: 60s destroy_timeout: 60s -redis: - addrs: - - "localhost:6379" +broadcast: + test: + driver: memory websockets: - pubsubs: [ "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 8321297d..29bf28be 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -16,7 +16,7 @@ import ( json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -25,6 +25,7 @@ import ( rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/websockets" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/utils" "github.com/stretchr/testify/assert" ) @@ -47,6 +48,7 @@ func TestBroadcastInit(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -98,52 +100,20 @@ func TestBroadcastInit(t *testing.T) { time.Sleep(time.Second * 1) t.Run("TestWSInit", wsInit) + t.Run("RPCWsMemoryPubAsync", RPCWsPubAsync("11111")) + t.Run("RPCWsMemory", RPCWsPub("11111")) stopCh <- struct{}{} wg.Wait() } -func wsInit(t *testing.T) { - da := websocket.Dialer{ - Proxy: http.ProxyFromEnvironment, - HandshakeTimeout: time.Second * 20, - } - - connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} - - c, resp, err := da.Dial(connURL.String(), nil) - assert.NoError(t, err) - - defer func() { - _ = resp.Body.Close() - }() - - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) - if err != nil { - panic(err) - } - - err = c.WriteMessage(websocket.BinaryMessage, d) - assert.NoError(t, err) - - _, msg, err := c.ReadMessage() - retMsg := utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) - - err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) - assert.NoError(t, err) -} - -func TestWSRedisAndMemory(t *testing.T) { +func TestWSRedis(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "configs/.rr-websockets-redis-memory.yaml", + Path: "configs/.rr-websockets-redis.yaml", Prefix: "rr", } @@ -155,7 +125,7 @@ func TestWSRedisAndMemory(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -205,21 +175,20 @@ func TestWSRedisAndMemory(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) - t.Run("RPCWsMemory", RPCWsMemory) - t.Run("RPCWsRedis", RPCWsRedis) + t.Run("RPCWsRedisPubAsync", RPCWsPubAsync("13235")) + t.Run("RPCWsRedisPub", RPCWsPub("13235")) stopCh <- struct{}{} wg.Wait() } -func TestWSRedisAndMemoryGlobal(t *testing.T) { +func TestWSRedisNoSection(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "configs/.rr-websockets-redis-memory.yaml", + Path: "configs/.rr-websockets-broker-no-section.yaml", Prefix: "rr", } @@ -231,7 +200,37 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, + &broadcast.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + assert.Error(t, err) +} + +func TestWSDeny(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-deny.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -281,21 +280,19 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) - t.Run("RPCWsMemory", RPCWsMemory) - t.Run("RPCWsRedis", RPCWsRedis) + t.Run("RPCWsMemoryDeny", RPCWsDeny("15587")) stopCh <- struct{}{} wg.Wait() } -func TestWSRedisNoSection(t *testing.T) { +func TestWSDeny2(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "configs/.rr-websockets-redis-no-section.yaml", + Path: "configs/.rr-websockets-deny2.yaml", Prefix: "rr", } @@ -304,10 +301,10 @@ func TestWSRedisNoSection(t *testing.T) { &rpcPlugin.Plugin{}, &logger.ZapLogger{}, &server.Plugin{}, - &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, + &redis.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -316,234 +313,60 @@ func TestWSRedisNoSection(t *testing.T) { t.Fatal(err) } - _, err = cont.Serve() - assert.Error(t, err) -} - -func RPCWsMemoryPubAsync(t *testing.T) { - da := websocket.Dialer{ - Proxy: http.ProxyFromEnvironment, - HandshakeTimeout: time.Second * 20, - } - - connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} - - c, resp, err := da.Dial(connURL.String(), nil) - assert.NoError(t, err) - - defer func() { - _ = resp.Body.Close() - }() - - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) - if err != nil { - panic(err) - } - - err = c.WriteMessage(websocket.BinaryMessage, d) - assert.NoError(t, err) - - _, msg, err := c.ReadMessage() - retMsg := utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) - - publishAsync(t, "", "memory", "foo") - - // VERIFY a makeMessage - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) - - // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) + ch, err := cont.Serve() if err != nil { - panic(err) + t.Fatal(err) } - err = c.WriteMessage(websocket.BinaryMessage, d) - assert.NoError(t, err) - - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - // subscription done - assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + wg := &sync.WaitGroup{} + wg.Add(1) - // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC - publishAsync(t, "", "memory", "foo") + stopCh := make(chan struct{}, 1) go func() { - time.Sleep(time.Second * 5) - publishAsync2(t, "", "memory", "foo2") - }() - - // should be only makeMessage from the subscribed foo2 topic - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg) - - err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) - assert.NoError(t, err) -} - -func RPCWsMemory(t *testing.T) { - da := websocket.Dialer{ - Proxy: http.ProxyFromEnvironment, - HandshakeTimeout: time.Second * 20, - } - - connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} - - c, resp, err := da.Dial(connURL.String(), nil) - assert.NoError(t, err) - - defer func() { - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } } }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) - if err != nil { - panic(err) - } - - err = c.WriteMessage(websocket.BinaryMessage, d) - assert.NoError(t, err) - - _, msg, err := c.ReadMessage() - retMsg := utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) - - publish("", "memory", "foo") - - // VERIFY a makeMessage - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) - - // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) - if err != nil { - panic(err) - } - - err = c.WriteMessage(websocket.BinaryMessage, d) - assert.NoError(t, err) - - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) - - // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC - publish("", "memory", "foo") - - go func() { - time.Sleep(time.Second * 5) - publish2(t, "", "memory", "foo2") - }() - - // should be only makeMessage from the subscribed foo2 topic - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg) - - err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) - assert.NoError(t, err) -} - -func RPCWsRedis(t *testing.T) { - da := websocket.Dialer{ - Proxy: http.ProxyFromEnvironment, - HandshakeTimeout: time.Second * 20, - } - - connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} - - c, resp, err := da.Dial(connURL.String(), nil) - assert.NoError(t, err) - - defer func() { - _ = resp.Body.Close() - }() - - d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2")) - if err != nil { - panic(err) - } - - err = c.WriteMessage(websocket.BinaryMessage, d) - assert.NoError(t, err) - - _, msg, err := c.ReadMessage() - retMsg := utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) - - publish("", "redis", "foo") - - // VERIFY a makeMessage - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) - - // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo")) - if err != nil { - panic(err) - } - - err = c.WriteMessage(websocket.BinaryMessage, d) - assert.NoError(t, err) - - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) - - // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC - publish("", "redis", "foo") - - go func() { - time.Sleep(time.Second * 5) - publish2(t, "", "redis", "foo2") - }() + time.Sleep(time.Second * 1) + t.Run("RPCWsRedisDeny", RPCWsDeny("15588")) - // should be only makeMessage from the subscribed foo2 topic - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg) + stopCh <- struct{}{} - err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) - assert.NoError(t, err) + wg.Wait() } -func TestWSMemoryDeny(t *testing.T) { +func TestWSStop(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "configs/.rr-websockets-memory-deny.yaml", + Path: "configs/.rr-websockets-stop.yaml", Prefix: "rr", } @@ -556,6 +379,7 @@ func TestWSMemoryDeny(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -605,73 +429,40 @@ func TestWSMemoryDeny(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("RPCWsMemoryDeny", RPCWsMemoryDeny) + t.Run("RPCWsStop", RPCWsMemoryStop("11114")) stopCh <- struct{}{} wg.Wait() } -func RPCWsMemoryDeny(t *testing.T) { - da := websocket.Dialer{ - Proxy: http.ProxyFromEnvironment, - HandshakeTimeout: time.Second * 20, - } - - connURL := url.URL{Scheme: "ws", Host: "localhost:11112", Path: "/ws"} +func RPCWsMemoryStop(port string) func(t *testing.T) { + return func(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } - c, resp, err := da.Dial(connURL.String(), nil) - assert.NoError(t, err) - assert.NotNil(t, c) - assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"} - defer func() { - if resp != nil && resp.Body != nil { + c, resp, err := da.Dial(connURL.String(), nil) + assert.NotNil(t, resp) + assert.Error(t, err) + assert.Nil(t, c) + assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) //nolint:staticcheck + assert.Equal(t, resp.Header.Get("Stop"), "we-dont-like-you") //nolint:staticcheck + if resp != nil && resp.Body != nil { //nolint:staticcheck _ = resp.Body.Close() } - }() - - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) - if err != nil { - panic(err) } - - err = c.WriteMessage(websocket.BinaryMessage, d) - assert.NoError(t, err) - - _, msg, err := c.ReadMessage() - retMsg := utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg) - - // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) - if err != nil { - panic(err) - } - - err = c.WriteMessage(websocket.BinaryMessage, d) - assert.NoError(t, err) - - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - - // subscription done - assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) - - err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) - assert.NoError(t, err) } -func TestWSMemoryStop(t *testing.T) { +func TestWSAllow(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "configs/.rr-websockets-memory-stop.yaml", + Path: "configs/.rr-websockets-allow.yaml", Prefix: "rr", } @@ -684,6 +475,7 @@ func TestWSMemoryStop(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -733,38 +525,19 @@ func TestWSMemoryStop(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("RPCWsMemoryStop", RPCWsMemoryStop) + t.Run("RPCWsMemoryAllow", RPCWsPub("41278")) stopCh <- struct{}{} wg.Wait() } -func RPCWsMemoryStop(t *testing.T) { - da := websocket.Dialer{ - Proxy: http.ProxyFromEnvironment, - HandshakeTimeout: time.Second * 20, - } - - connURL := url.URL{Scheme: "ws", Host: "localhost:11114", Path: "/ws"} - - c, resp, err := da.Dial(connURL.String(), nil) - assert.NotNil(t, resp) - assert.Error(t, err) - assert.Nil(t, c) - assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) //nolint:staticcheck - assert.Equal(t, resp.Header.Get("Stop"), "we-dont-like-you") //nolint:staticcheck - if resp != nil && resp.Body != nil { //nolint:staticcheck - _ = resp.Body.Close() - } -} - -func TestWSMemoryOk(t *testing.T) { +func TestWSAllow2(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "configs/.rr-websockets-memory-allow.yaml", + Path: "configs/.rr-websockets-allow2.yaml", Prefix: "rr", } @@ -777,6 +550,7 @@ func TestWSMemoryOk(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -826,33 +600,29 @@ func TestWSMemoryOk(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("RPCWsMemoryAllow", RPCWsMemoryAllow) + t.Run("RPCWsMemoryAllow", RPCWsPub("41270")) stopCh <- struct{}{} wg.Wait() } -func RPCWsMemoryAllow(t *testing.T) { +func wsInit(t *testing.T) { da := websocket.Dialer{ Proxy: http.ProxyFromEnvironment, HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:11113", Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NoError(t, err) - assert.NotNil(t, c) - assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) defer func() { - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() - } + _ = resp.Body.Close() }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -867,64 +637,219 @@ func RPCWsMemoryAllow(t *testing.T) { // subscription done assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) - publish("", "memory", "foo") - - // VERIFY a makeMessage - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) +} - // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) - if err != nil { - panic(err) - } +func RPCWsPubAsync(port string) func(t *testing.T) { + return func(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 18, + } - err = c.WriteMessage(websocket.BinaryMessage, d) - assert.NoError(t, err) + connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"} - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) - // subscription done - assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + defer func() { + _ = resp.Body.Close() + }() - // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC - publish("", "memory", "foo") + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } - go func() { - time.Sleep(time.Second * 5) - publish2(t, "", "memory", "foo2") - }() + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) - // should be only makeMessage from the subscribed foo2 topic - _, msg, err = c.ReadMessage() - retMsg = utils.AsString(msg) - assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg) + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) - err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) - assert.NoError(t, err) + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + publishAsync(t, "placeholder", "foo") + + // VERIFY a makeMessage + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) + + // //// LEAVE foo ///////// + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publishAsync(t, "placeholder", "foo") + + go func() { + time.Sleep(time.Second * 3) + publishAsync(t, "placeholder", "foo2") + }() + + // should be only makeMessage from the subscribed foo0 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP\"}", retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) + } } -func publish(command string, broker string, topics ...string) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - if err != nil { - panic(err) +func RPCWsPub(port string) func(t *testing.T) { + return func(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + }() + + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + publish("", "foo") + + // VERIFY a makeMessage + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publish("", "foo") + + go func() { + time.Sleep(time.Second * 5) + publish2(t, "", "foo2") + }() + + // should be only makeMessage from the subscribed foo2 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) } +} - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) +func RPCWsDeny(port string) func(t *testing.T) { + return func(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } - ret := &websocketsv1.Response{} - err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret) - if err != nil { - panic(err) + connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + assert.NotNil(t, c) + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + + defer func() { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + }() + + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) } } -func publishAsync(t *testing.T, command string, broker string, topics ...string) { +// --------------------------------------------------------------------------------------------------- + +func publish(command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -933,12 +858,13 @@ func publishAsync(t *testing.T, command string, broker string, topics ...string) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret) - assert.NoError(t, err) - assert.True(t, ret.Ok) + err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP"), topics...), ret) + if err != nil { + panic(err) + } } -func publishAsync2(t *testing.T, command string, broker string, topics ...string) { +func publishAsync(t *testing.T, command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -947,12 +873,12 @@ func publishAsync2(t *testing.T, command string, broker string, topics ...string client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret) + err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP"), topics...), ret) assert.NoError(t, err) assert.True(t, ret.Ok) } -func publish2(t *testing.T, command string, broker string, topics ...string) { +func publish2(t *testing.T, command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -961,27 +887,25 @@ func publish2(t *testing.T, command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret) + err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP2"), topics...), ret) assert.NoError(t, err) assert.True(t, ret.Ok) } -func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message { +func messageWS(command string, payload []byte, topics ...string) *websocketsv1.Message { return &websocketsv1.Message{ Topics: topics, Command: command, - Broker: broker, Payload: payload, } } -func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Request { +func makeMessage(command string, payload []byte, topics ...string) *websocketsv1.Request { m := &websocketsv1.Request{ Messages: []*websocketsv1.Message{ { Topics: topics, Command: command, - Broker: broker, Payload: payload, }, }, |