diff options
Diffstat (limited to 'tests/plugins/broadcast')
-rw-r--r-- | tests/plugins/broadcast/broadcast_plugin_test.go | 36 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin1.go | 31 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin2.go | 31 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin3.go | 33 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin4.go | 34 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin5.go | 34 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin6.go | 34 |
7 files changed, 129 insertions, 104 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index a78b17e1..3dcc5c2c 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -1,6 +1,7 @@ package broadcast import ( + "context" "net" "net/rpc" "os" @@ -10,6 +11,7 @@ import ( "testing" "time" + goRedis "github.com/go-redis/redis/v8" "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" @@ -176,6 +178,9 @@ func TestBroadcastNoConfig(t *testing.T) { } func TestBroadcastSameSubscriber(t *testing.T) { + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) @@ -274,8 +279,11 @@ func TestBroadcastSameSubscriber(t *testing.T) { time.Sleep(time.Second * 2) t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002")) + time.Sleep(time.Second) t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002")) + time.Sleep(time.Second) t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) + time.Sleep(time.Second) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) time.Sleep(time.Second * 5) @@ -283,9 +291,17 @@ func TestBroadcastSameSubscriber(t *testing.T) { stopCh <- struct{}{} wg.Wait() + + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + + time.Sleep(time.Second * 5) } func TestBroadcastSameSubscriberGlobal(t *testing.T) { + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) @@ -384,8 +400,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { time.Sleep(time.Second * 2) t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003")) + time.Sleep(time.Second) t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003")) + time.Sleep(time.Second) t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) + time.Sleep(time.Second) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) time.Sleep(time.Second * 4) @@ -393,7 +412,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { stopCh <- struct{}{} wg.Wait() + time.Sleep(time.Second * 5) + + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379")) + t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378")) } func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) { @@ -446,6 +469,7 @@ func BroadcastPublishFoo3(port string) func(t *testing.T) { } } } + func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) { return func(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:"+port) @@ -475,3 +499,15 @@ func makeMessage(payload []byte, topics ...string) *websocketsv1.Request { return m } + +func redisFlushAll(addr string) func(t *testing.T) { + return func(t *testing.T) { + rdb := goRedis.NewClient(&goRedis.Options{ + Addr: addr, + Password: "", // no password set + DB: 0, // use default DB + }) + + rdb.FlushAll(context.Background()) + } +} diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go index 01ad1479..ed5139a8 100644 --- a/tests/plugins/broadcast/plugins/plugin1.go +++ b/tests/plugins/broadcast/plugins/plugin1.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,14 +16,14 @@ type Plugin1 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +44,16 @@ func (p *Plugin1) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } - - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + errCh <- err + return } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) } }() @@ -68,8 +64,7 @@ func (p *Plugin1) Stop() error { _ = p.driver.Unsubscribe("1", "foo") _ = p.driver.Unsubscribe("1", "foo2") _ = p.driver.Unsubscribe("1", "foo3") - - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go index ee072ffe..20cc1b24 100644 --- a/tests/plugins/broadcast/plugins/plugin2.go +++ b/tests/plugins/broadcast/plugins/plugin2.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,13 +16,14 @@ type Plugin2 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -40,22 +43,20 @@ func (p *Plugin2) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) } }() @@ -64,7 +65,7 @@ func (p *Plugin2) Serve() chan error { func (p *Plugin2) Stop() error { _ = p.driver.Unsubscribe("2", "foo") - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go index 288201d1..2f416d2e 100644 --- a/tests/plugins/broadcast/plugins/plugin3.go +++ b/tests/plugins/broadcast/plugins/plugin3.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin3 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin3) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) } }() @@ -66,7 +65,7 @@ func (p *Plugin3) Serve() chan error { func (p *Plugin3) Stop() error { _ = p.driver.Unsubscribe("3", "foo") - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go index 56f79c0f..e2209648 100644 --- a/tests/plugins/broadcast/plugins/plugin4.go +++ b/tests/plugins/broadcast/plugins/plugin4.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin4 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin4) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) } }() @@ -66,8 +65,7 @@ func (p *Plugin4) Serve() chan error { func (p *Plugin4) Stop() error { _ = p.driver.Unsubscribe("4", "foo") - - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go index e7cd7e60..122046b8 100644 --- a/tests/plugins/broadcast/plugins/plugin5.go +++ b/tests/plugins/broadcast/plugins/plugin5.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin5 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin5) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) } }() @@ -66,8 +65,7 @@ func (p *Plugin5) Serve() chan error { func (p *Plugin5) Stop() error { _ = p.driver.Unsubscribe("5", "foo") - - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go index 08272196..6ace0a79 100644 --- a/tests/plugins/broadcast/plugins/plugin6.go +++ b/tests/plugins/broadcast/plugins/plugin6.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin6 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin6) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) } }() @@ -66,8 +65,7 @@ func (p *Plugin6) Serve() chan error { func (p *Plugin6) Stop() error { _ = p.driver.Unsubscribe("6", "foo") - - p.exit <- struct{}{} + p.cancel() return nil } |