summaryrefslogtreecommitdiff
path: root/tests/plugins/broadcast
diff options
context:
space:
mode:
Diffstat (limited to 'tests/plugins/broadcast')
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go36
-rw-r--r--tests/plugins/broadcast/plugins/plugin1.go31
-rw-r--r--tests/plugins/broadcast/plugins/plugin2.go31
-rw-r--r--tests/plugins/broadcast/plugins/plugin3.go33
-rw-r--r--tests/plugins/broadcast/plugins/plugin4.go34
-rw-r--r--tests/plugins/broadcast/plugins/plugin5.go34
-rw-r--r--tests/plugins/broadcast/plugins/plugin6.go34
7 files changed, 129 insertions, 104 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index a78b17e1..3dcc5c2c 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -1,6 +1,7 @@
package broadcast
import (
+ "context"
"net"
"net/rpc"
"os"
@@ -10,6 +11,7 @@ import (
"testing"
"time"
+ goRedis "github.com/go-redis/redis/v8"
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
@@ -176,6 +178,9 @@ func TestBroadcastNoConfig(t *testing.T) {
}
func TestBroadcastSameSubscriber(t *testing.T) {
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
@@ -274,8 +279,11 @@ func TestBroadcastSameSubscriber(t *testing.T) {
time.Sleep(time.Second * 2)
t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002"))
+ time.Sleep(time.Second)
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002"))
time.Sleep(time.Second * 5)
@@ -283,9 +291,17 @@ func TestBroadcastSameSubscriber(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()
+
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
+ time.Sleep(time.Second * 5)
}
func TestBroadcastSameSubscriberGlobal(t *testing.T) {
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
@@ -384,8 +400,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
time.Sleep(time.Second * 2)
t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003"))
+ time.Sleep(time.Second)
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003"))
time.Sleep(time.Second * 4)
@@ -393,7 +412,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()
+
time.Sleep(time.Second * 5)
+
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
}
func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) {
@@ -446,6 +469,7 @@ func BroadcastPublishFoo3(port string) func(t *testing.T) {
}
}
}
+
func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) {
return func(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:"+port)
@@ -475,3 +499,15 @@ func makeMessage(payload []byte, topics ...string) *websocketsv1.Request {
return m
}
+
+func redisFlushAll(addr string) func(t *testing.T) {
+ return func(t *testing.T) {
+ rdb := goRedis.NewClient(&goRedis.Options{
+ Addr: addr,
+ Password: "", // no password set
+ DB: 0, // use default DB
+ })
+
+ rdb.FlushAll(context.Background())
+ }
+}
diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go
index 01ad1479..ed5139a8 100644
--- a/tests/plugins/broadcast/plugins/plugin1.go
+++ b/tests/plugins/broadcast/plugins/plugin1.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,14 +16,14 @@ type Plugin1 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +44,16 @@ func (p *Plugin1) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
-
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
+ errCh <- err
+ return
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
}
}()
@@ -68,8 +64,7 @@ func (p *Plugin1) Stop() error {
_ = p.driver.Unsubscribe("1", "foo")
_ = p.driver.Unsubscribe("1", "foo2")
_ = p.driver.Unsubscribe("1", "foo3")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go
index ee072ffe..20cc1b24 100644
--- a/tests/plugins/broadcast/plugins/plugin2.go
+++ b/tests/plugins/broadcast/plugins/plugin2.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,13 +16,14 @@ type Plugin2 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -40,22 +43,20 @@ func (p *Plugin2) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
}
}()
@@ -64,7 +65,7 @@ func (p *Plugin2) Serve() chan error {
func (p *Plugin2) Stop() error {
_ = p.driver.Unsubscribe("2", "foo")
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go
index 288201d1..2f416d2e 100644
--- a/tests/plugins/broadcast/plugins/plugin3.go
+++ b/tests/plugins/broadcast/plugins/plugin3.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin3 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin3) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
}
}()
@@ -66,7 +65,7 @@ func (p *Plugin3) Serve() chan error {
func (p *Plugin3) Stop() error {
_ = p.driver.Unsubscribe("3", "foo")
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go
index 56f79c0f..e2209648 100644
--- a/tests/plugins/broadcast/plugins/plugin4.go
+++ b/tests/plugins/broadcast/plugins/plugin4.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin4 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin4) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
}
}()
@@ -66,8 +65,7 @@ func (p *Plugin4) Serve() chan error {
func (p *Plugin4) Stop() error {
_ = p.driver.Unsubscribe("4", "foo")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go
index e7cd7e60..122046b8 100644
--- a/tests/plugins/broadcast/plugins/plugin5.go
+++ b/tests/plugins/broadcast/plugins/plugin5.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin5 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin5) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
}
}()
@@ -66,8 +65,7 @@ func (p *Plugin5) Serve() chan error {
func (p *Plugin5) Stop() error {
_ = p.driver.Unsubscribe("5", "foo")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go
index 08272196..6ace0a79 100644
--- a/tests/plugins/broadcast/plugins/plugin6.go
+++ b/tests/plugins/broadcast/plugins/plugin6.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin6 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin6) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
}
}()
@@ -66,8 +65,7 @@ func (p *Plugin6) Serve() chan error {
func (p *Plugin6) Stop() error {
_ = p.driver.Unsubscribe("6", "foo")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}