summaryrefslogtreecommitdiff
path: root/tests/plugins/broadcast
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-23 19:02:07 +0300
committerValery Piashchynski <[email protected]>2021-06-23 19:02:07 +0300
commit28f8182ab27dddbe9d432586aab83aa398432b16 (patch)
treeed4fc1122815cfe1fdeac643771f24980fc2bbea /tests/plugins/broadcast
parent521aeb823bc8fa1f0a91b540cbbac96328185f51 (diff)
- Fix broadcast tests panic
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests/plugins/broadcast')
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go21
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-global.yaml9
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml9
-rw-r--r--tests/plugins/broadcast/plugins/plugin1.go27
-rw-r--r--tests/plugins/broadcast/plugins/plugin2.go25
-rw-r--r--tests/plugins/broadcast/plugins/plugin3.go27
-rw-r--r--tests/plugins/broadcast/plugins/plugin4.go28
-rw-r--r--tests/plugins/broadcast/plugins/plugin5.go28
-rw-r--r--tests/plugins/broadcast/plugins/plugin6.go28
9 files changed, 128 insertions, 74 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index 2cd4b451..31e6871d 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -176,7 +176,7 @@ func TestBroadcastNoConfig(t *testing.T) {
}
func TestBroadcastSameSubscriber(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
cfg := &config.Viper{
@@ -189,11 +189,11 @@ func TestBroadcastSameSubscriber(t *testing.T) {
mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).MinTimes(1)
- mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).AnyTimes()
+ mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3)
- mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2)
mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3)
@@ -278,14 +278,15 @@ func TestBroadcastSameSubscriber(t *testing.T) {
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002"))
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002"))
- time.Sleep(time.Second * 4)
stopCh <- struct{}{}
wg.Wait()
+
+ time.Sleep(time.Second * 5)
}
func TestBroadcastSameSubscriberGlobal(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
cfg := &config.Viper{
@@ -298,11 +299,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).MinTimes(1)
- mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).AnyTimes()
+ mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3)
- mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2)
mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3)
@@ -387,10 +388,10 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003"))
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003"))
- time.Sleep(time.Second * 4)
stopCh <- struct{}{}
wg.Wait()
+ time.Sleep(time.Second * 5)
}
func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) {
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
index 2ca97055..ea25988c 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
@@ -3,8 +3,6 @@ rpc:
server:
command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
relay: "pipes"
relay_timeout: "20s"
@@ -38,9 +36,4 @@ broadcast:
logs:
mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
+ level: info
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
index 360e05e5..cbe18196 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
@@ -3,8 +3,6 @@ rpc:
server:
command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
relay: "pipes"
relay_timeout: "20s"
@@ -35,9 +33,4 @@ broadcast:
logs:
mode: development
- level: debug
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
+ level: info
diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go
index 390ba581..01ad1479 100644
--- a/tests/plugins/broadcast/plugins/plugin1.go
+++ b/tests/plugins/broadcast/plugins/plugin1.go
@@ -14,11 +14,14 @@ type Plugin1 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -39,16 +42,22 @@ func (p *Plugin1) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
+ }
}
}()
@@ -59,6 +68,8 @@ func (p *Plugin1) Stop() error {
_ = p.driver.Unsubscribe("1", "foo")
_ = p.driver.Unsubscribe("1", "foo2")
_ = p.driver.Unsubscribe("1", "foo3")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go
index 809020dc..ee072ffe 100644
--- a/tests/plugins/broadcast/plugins/plugin2.go
+++ b/tests/plugins/broadcast/plugins/plugin2.go
@@ -14,11 +14,13 @@ type Plugin2 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+ exit chan struct{}
}
func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +40,22 @@ func (p *Plugin2) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
+ }
}
}()
@@ -56,6 +64,7 @@ func (p *Plugin2) Serve() chan error {
func (p *Plugin2) Stop() error {
_ = p.driver.Unsubscribe("2", "foo")
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go
index 4507a5b9..288201d1 100644
--- a/tests/plugins/broadcast/plugins/plugin3.go
+++ b/tests/plugins/broadcast/plugins/plugin3.go
@@ -14,11 +14,15 @@ type Plugin3 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin3) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
+ }
}
}()
@@ -56,6 +66,7 @@ func (p *Plugin3) Serve() chan error {
func (p *Plugin3) Stop() error {
_ = p.driver.Unsubscribe("3", "foo")
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go
index 6783855e..56f79c0f 100644
--- a/tests/plugins/broadcast/plugins/plugin4.go
+++ b/tests/plugins/broadcast/plugins/plugin4.go
@@ -14,11 +14,15 @@ type Plugin4 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin4) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
+ }
}
}()
@@ -56,6 +66,8 @@ func (p *Plugin4) Serve() chan error {
func (p *Plugin4) Stop() error {
_ = p.driver.Unsubscribe("4", "foo")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go
index fade6b66..e7cd7e60 100644
--- a/tests/plugins/broadcast/plugins/plugin5.go
+++ b/tests/plugins/broadcast/plugins/plugin5.go
@@ -14,11 +14,15 @@ type Plugin5 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin5) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
+ }
}
}()
@@ -56,6 +66,8 @@ func (p *Plugin5) Serve() chan error {
func (p *Plugin5) Stop() error {
_ = p.driver.Unsubscribe("5", "foo")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go
index d98a50b7..08272196 100644
--- a/tests/plugins/broadcast/plugins/plugin6.go
+++ b/tests/plugins/broadcast/plugins/plugin6.go
@@ -14,11 +14,15 @@ type Plugin6 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin6) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
+ }
}
}()
@@ -56,6 +66,8 @@ func (p *Plugin6) Serve() chan error {
func (p *Plugin6) Stop() error {
_ = p.driver.Unsubscribe("6", "foo")
+
+ p.exit <- struct{}{}
return nil
}