diff options
Diffstat (limited to 'tests/plugins/broadcast')
-rw-r--r-- | tests/plugins/broadcast/broadcast_plugin_test.go | 513 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml | 32 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-global.yaml | 50 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-init.yaml | 36 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml | 29 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml | 48 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin1.go | 73 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin2.go | 74 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin3.go | 74 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin4.go | 74 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin5.go | 74 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin6.go | 74 |
12 files changed, 0 insertions, 1151 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go deleted file mode 100644 index 5d8c9ce9..00000000 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ /dev/null @@ -1,513 +0,0 @@ -package broadcast - -import ( - "context" - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - goRedis "github.com/go-redis/redis/v8" - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/config" - httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/memory" - "github.com/spiral/roadrunner/v2/plugins/redis" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/websockets" - websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/spiral/roadrunner/v2/tests/plugins/broadcast/plugins" - "github.com/stretchr/testify/assert" -) - -func TestBroadcastInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-broadcast-init.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &broadcast.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &server.Plugin{}, - &redis.Plugin{}, - &websockets.Plugin{}, - &httpPlugin.Plugin{}, - &memory.Plugin{}, - ) - - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - stopCh <- struct{}{} - - wg.Wait() -} - -func TestBroadcastConfigError(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-broadcast-config-error.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &broadcast.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &server.Plugin{}, - &redis.Plugin{}, - &websockets.Plugin{}, - &httpPlugin.Plugin{}, - &memory.Plugin{}, - - &plugins.Plugin1{}, - ) - - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - _, err = cont.Serve() - assert.Error(t, err) -} - -func TestBroadcastNoConfig(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-broadcast-no-config.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", []string{}).MinTimes(1) - - err = cont.RegisterAll( - cfg, - &broadcast.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &server.Plugin{}, - &redis.Plugin{}, - &websockets.Plugin{}, - &httpPlugin.Plugin{}, - &memory.Plugin{}, - ) - - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - // should be just disabled - _, err = cont.Serve() - assert.NoError(t, err) -} - -func TestBroadcastSameSubscriber(t *testing.T) { - t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) - t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) - - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-broadcast-same-section.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "plugins", []string{"broadcast"}).AnyTimes() - mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2) - mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin4: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin5: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin6: {foo hello}`).Times(3) - - err = cont.RegisterAll( - cfg, - &broadcast.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &server.Plugin{}, - &redis.Plugin{}, - &websockets.Plugin{}, - &httpPlugin.Plugin{}, - &memory.Plugin{}, - - // test - redis - // test2 - redis (port 6378) - // test3 - memory - // test4 - memory - &plugins.Plugin1{}, // foo, foo2, foo3 test - &plugins.Plugin2{}, // foo, test - &plugins.Plugin3{}, // foo, test2 - &plugins.Plugin4{}, // foo, test3 - &plugins.Plugin5{}, // foo, test4 - &plugins.Plugin6{}, // foo, test3 - ) - - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 2) - - t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002")) - time.Sleep(time.Second) - t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002")) - time.Sleep(time.Second) - t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) - time.Sleep(time.Second) - t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) - - time.Sleep(time.Second * 5) - - stopCh <- struct{}{} - - wg.Wait() - - t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) - t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) - - time.Sleep(time.Second * 5) -} - -func TestBroadcastSameSubscriberGlobal(t *testing.T) { - t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) - t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) - - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-broadcast-global.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "plugins", []string{"broadcast"}).AnyTimes() - mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2) - mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin4: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin5: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin6: {foo hello}`).Times(3) - - err = cont.RegisterAll( - cfg, - &broadcast.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &server.Plugin{}, - &redis.Plugin{}, - &websockets.Plugin{}, - &httpPlugin.Plugin{}, - &memory.Plugin{}, - - // test - redis - // test2 - redis (port 6378) - // test3 - memory - // test4 - memory - &plugins.Plugin1{}, // foo, foo2, foo3 test - &plugins.Plugin2{}, // foo, test - &plugins.Plugin3{}, // foo, test2 - &plugins.Plugin4{}, // foo, test3 - &plugins.Plugin5{}, // foo, test4 - &plugins.Plugin6{}, // foo, test3 - ) - - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 2) - - t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003")) - time.Sleep(time.Second) - t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003")) - time.Sleep(time.Second) - t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) - time.Sleep(time.Second) - t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) - - time.Sleep(time.Second * 4) - - stopCh <- struct{}{} - - wg.Wait() - - time.Sleep(time.Second * 5) - - t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) - t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) -} - -func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:"+port) - if err != nil { - t.Fatal(err) - } - - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - ret := &websocketsv1.Response{} - err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo", "foo2", "foo3"), ret) - if err != nil { - t.Fatal(err) - } - } -} - -func BroadcastPublishFoo2(port string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:"+port) - if err != nil { - t.Fatal(err) - } - - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - ret := &websocketsv1.Response{} - err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo"), ret) - if err != nil { - t.Fatal(err) - } - } -} - -func BroadcastPublishFoo3(port string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:"+port) - if err != nil { - t.Fatal(err) - } - - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - ret := &websocketsv1.Response{} - err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo3"), ret) - if err != nil { - t.Fatal(err) - } - } -} - -func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:"+port) - if err != nil { - t.Fatal(err) - } - - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - ret := &websocketsv1.Response{} - err = client.Call("broadcast.PublishAsync", makeMessage([]byte("hello"), "foo", "foo2", "foo3"), ret) - if err != nil { - t.Fatal(err) - } - } -} - -func makeMessage(payload []byte, topics ...string) *websocketsv1.Request { - m := &websocketsv1.Request{ - Messages: []*websocketsv1.Message{ - { - Topics: topics, - Payload: payload, - }, - }, - } - - return m -} - -func redisFlushAll(addr string) func(t *testing.T) { - return func(t *testing.T) { - rdb := goRedis.NewClient(&goRedis.Options{ - Addr: addr, - Password: "", // no password set - DB: 0, // use default DB - }) - - rdb.FlushAll(context.Background()) - } -} diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml deleted file mode 100644 index 1474feb7..00000000 --- a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml +++ /dev/null @@ -1,32 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../psr-worker-bench.php" - relay: "pipes" - relay_timeout: "20s" - -http: - address: 127.0.0.1:21345 - max_request_size: 1024 - middleware: [ "websockets" ] - trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] - pool: - num_workers: 2 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - -# no global or local config -broadcast: - default: - driver: redis - -logs: - mode: development - level: debug - -endure: - grace_period: 120s - print_graph: false - log_level: error diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml deleted file mode 100644 index a7f9c35d..00000000 --- a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml +++ /dev/null @@ -1,50 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6003 - -server: - command: "php ../../psr-worker-bench.php" - relay: "pipes" - relay_timeout: "20s" - -http: - address: 127.0.0.1:21543 - max_request_size: 1024 - middleware: ["websockets"] - trusted_subnets: - [ - "10.0.0.0/8", - "127.0.0.0/8", - "172.16.0.0/12", - "192.168.0.0/16", - "::1/128", - "fc00::/7", - "fe80::/10", - ] - pool: - num_workers: 2 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - -test: - addrs: - - "127.0.0.1:6379" - -broadcast: - test: - driver: redis - test2: - driver: redis - config: - addrs: - - "127.0.0.1:6378" - test3: - driver: memory - config: {} - test4: - driver: memory - config: {} - -logs: - mode: development - level: info diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml deleted file mode 100644 index 1cbebdd7..00000000 --- a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml +++ /dev/null @@ -1,36 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../psr-worker-bench.php" - user: "" - group: "" - relay: "pipes" - relay_timeout: "20s" - -http: - address: 127.0.0.1:21345 - max_request_size: 1024 - middleware: [ "websockets" ] - trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] - pool: - num_workers: 2 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - -broadcast: - default: - driver: redis - config: - addrs: - - "127.0.0.1:6379" - -logs: - mode: development - level: error - -endure: - grace_period: 120s - print_graph: false - log_level: error diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml deleted file mode 100644 index 90790869..00000000 --- a/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml +++ /dev/null @@ -1,29 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../psr-worker-bench.php" - user: "" - group: "" - relay: "pipes" - relay_timeout: "20s" - -http: - address: 127.0.0.1:21345 - max_request_size: 1024 - middleware: [ "websockets" ] - trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] - pool: - num_workers: 2 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - -logs: - mode: development - level: debug - -endure: - grace_period: 120s - print_graph: false - log_level: error diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml deleted file mode 100644 index 85a767cb..00000000 --- a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml +++ /dev/null @@ -1,48 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6002 - -server: - command: "php ../../psr-worker-bench.php" - relay: "pipes" - relay_timeout: "20s" - -http: - address: 127.0.0.1:21345 - max_request_size: 1024 - middleware: ["websockets"] - trusted_subnets: - [ - "10.0.0.0/8", - "127.0.0.0/8", - "172.16.0.0/12", - "192.168.0.0/16", - "::1/128", - "fc00::/7", - "fe80::/10", - ] - pool: - num_workers: 2 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - -broadcast: - test: - driver: redis - config: - addrs: - - "127.0.0.1:6379" - test2: - driver: redis - config: - addrs: - - "127.0.0.1:6378" - test3: - driver: memory - config: {} - test4: - driver: memory - config: {} -logs: - mode: development - level: info diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go deleted file mode 100644 index ed5139a8..00000000 --- a/tests/plugins/broadcast/plugins/plugin1.go +++ /dev/null @@ -1,73 +0,0 @@ -package plugins - -import ( - "context" - "fmt" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const Plugin1Name = "plugin1" - -type Plugin1 struct { - log logger.Logger - b broadcast.Broadcaster - driver pubsub.SubReader - ctx context.Context - cancel context.CancelFunc -} - -func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error { - p.log = log - p.b = b - p.ctx, p.cancel = context.WithCancel(context.Background()) - return nil -} - -func (p *Plugin1) Serve() chan error { - errCh := make(chan error, 1) - - var err error - p.driver, err = p.b.GetDriver("test") - if err != nil { - errCh <- err - return errCh - } - - err = p.driver.Subscribe("1", "foo", "foo2", "foo3") - if err != nil { - panic(err) - } - - go func() { - for { - msg, err := p.driver.Next(p.ctx) - if err != nil { - if errors.Is(errors.TimeOut, err) { - return - } - errCh <- err - return - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) - } - }() - - return errCh -} - -func (p *Plugin1) Stop() error { - _ = p.driver.Unsubscribe("1", "foo") - _ = p.driver.Unsubscribe("1", "foo2") - _ = p.driver.Unsubscribe("1", "foo3") - p.cancel() - return nil -} - -func (p *Plugin1) Name() string { - return Plugin1Name -} diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go deleted file mode 100644 index 20cc1b24..00000000 --- a/tests/plugins/broadcast/plugins/plugin2.go +++ /dev/null @@ -1,74 +0,0 @@ -package plugins - -import ( - "context" - "fmt" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const Plugin2Name = "plugin2" - -type Plugin2 struct { - log logger.Logger - b broadcast.Broadcaster - driver pubsub.SubReader - ctx context.Context - cancel context.CancelFunc -} - -func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error { - p.log = log - p.b = b - p.ctx, p.cancel = context.WithCancel(context.Background()) - return nil -} - -func (p *Plugin2) Serve() chan error { - errCh := make(chan error, 1) - - var err error - p.driver, err = p.b.GetDriver("test") - if err != nil { - panic(err) - } - - err = p.driver.Subscribe("2", "foo") - if err != nil { - panic(err) - } - - go func() { - for { - msg, err := p.driver.Next(p.ctx) - if err != nil { - if errors.Is(errors.TimeOut, err) { - return - } - errCh <- err - return - } - - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) - } - }() - - return errCh -} - -func (p *Plugin2) Stop() error { - _ = p.driver.Unsubscribe("2", "foo") - p.cancel() - return nil -} - -func (p *Plugin2) Name() string { - return Plugin2Name -} diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go deleted file mode 100644 index 2f416d2e..00000000 --- a/tests/plugins/broadcast/plugins/plugin3.go +++ /dev/null @@ -1,74 +0,0 @@ -package plugins - -import ( - "context" - "fmt" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const Plugin3Name = "plugin3" - -type Plugin3 struct { - log logger.Logger - b broadcast.Broadcaster - driver pubsub.SubReader - ctx context.Context - cancel context.CancelFunc -} - -func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error { - p.log = log - p.b = b - p.ctx, p.cancel = context.WithCancel(context.Background()) - return nil -} - -func (p *Plugin3) Serve() chan error { - errCh := make(chan error, 1) - - var err error - p.driver, err = p.b.GetDriver("test2") - if err != nil { - panic(err) - } - - err = p.driver.Subscribe("3", "foo") - if err != nil { - panic(err) - } - - go func() { - for { - msg, err := p.driver.Next(p.ctx) - if err != nil { - if errors.Is(errors.TimeOut, err) { - return - } - errCh <- err - return - } - - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) - } - }() - - return errCh -} - -func (p *Plugin3) Stop() error { - _ = p.driver.Unsubscribe("3", "foo") - p.cancel() - return nil -} - -func (p *Plugin3) Name() string { - return Plugin3Name -} diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go deleted file mode 100644 index e2209648..00000000 --- a/tests/plugins/broadcast/plugins/plugin4.go +++ /dev/null @@ -1,74 +0,0 @@ -package plugins - -import ( - "context" - "fmt" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const Plugin4Name = "plugin4" - -type Plugin4 struct { - log logger.Logger - b broadcast.Broadcaster - driver pubsub.SubReader - ctx context.Context - cancel context.CancelFunc -} - -func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error { - p.log = log - p.b = b - p.ctx, p.cancel = context.WithCancel(context.Background()) - return nil -} - -func (p *Plugin4) Serve() chan error { - errCh := make(chan error, 1) - - var err error - p.driver, err = p.b.GetDriver("test3") - if err != nil { - panic(err) - } - - err = p.driver.Subscribe("4", "foo") - if err != nil { - panic(err) - } - - go func() { - for { - msg, err := p.driver.Next(p.ctx) - if err != nil { - if errors.Is(errors.TimeOut, err) { - return - } - errCh <- err - return - } - - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) - } - }() - - return errCh -} - -func (p *Plugin4) Stop() error { - _ = p.driver.Unsubscribe("4", "foo") - p.cancel() - return nil -} - -func (p *Plugin4) Name() string { - return Plugin4Name -} diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go deleted file mode 100644 index 122046b8..00000000 --- a/tests/plugins/broadcast/plugins/plugin5.go +++ /dev/null @@ -1,74 +0,0 @@ -package plugins - -import ( - "context" - "fmt" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const Plugin5Name = "plugin5" - -type Plugin5 struct { - log logger.Logger - b broadcast.Broadcaster - driver pubsub.SubReader - ctx context.Context - cancel context.CancelFunc -} - -func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error { - p.log = log - p.b = b - p.ctx, p.cancel = context.WithCancel(context.Background()) - return nil -} - -func (p *Plugin5) Serve() chan error { - errCh := make(chan error, 1) - - var err error - p.driver, err = p.b.GetDriver("test4") - if err != nil { - panic(err) - } - - err = p.driver.Subscribe("5", "foo") - if err != nil { - panic(err) - } - - go func() { - for { - msg, err := p.driver.Next(p.ctx) - if err != nil { - if errors.Is(errors.TimeOut, err) { - return - } - errCh <- err - return - } - - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) - } - }() - - return errCh -} - -func (p *Plugin5) Stop() error { - _ = p.driver.Unsubscribe("5", "foo") - p.cancel() - return nil -} - -func (p *Plugin5) Name() string { - return Plugin5Name -} diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go deleted file mode 100644 index 6ace0a79..00000000 --- a/tests/plugins/broadcast/plugins/plugin6.go +++ /dev/null @@ -1,74 +0,0 @@ -package plugins - -import ( - "context" - "fmt" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const Plugin6Name = "plugin6" - -type Plugin6 struct { - log logger.Logger - b broadcast.Broadcaster - driver pubsub.SubReader - ctx context.Context - cancel context.CancelFunc -} - -func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error { - p.log = log - p.b = b - p.ctx, p.cancel = context.WithCancel(context.Background()) - return nil -} - -func (p *Plugin6) Serve() chan error { - errCh := make(chan error, 1) - - var err error - p.driver, err = p.b.GetDriver("test") - if err != nil { - panic(err) - } - - err = p.driver.Subscribe("6", "foo") - if err != nil { - panic(err) - } - - go func() { - for { - msg, err := p.driver.Next(p.ctx) - if err != nil { - if errors.Is(errors.TimeOut, err) { - return - } - errCh <- err - return - } - - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) - } - }() - - return errCh -} - -func (p *Plugin6) Stop() error { - _ = p.driver.Unsubscribe("6", "foo") - p.cancel() - return nil -} - -func (p *Plugin6) Name() string { - return Plugin6Name -} |