diff options
author | Valery Piashchynski <[email protected]> | 2021-06-23 19:02:07 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-23 19:02:07 +0300 |
commit | 28f8182ab27dddbe9d432586aab83aa398432b16 (patch) | |
tree | ed4fc1122815cfe1fdeac643771f24980fc2bbea /tests/plugins/broadcast | |
parent | 521aeb823bc8fa1f0a91b540cbbac96328185f51 (diff) |
- Fix broadcast tests panic
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests/plugins/broadcast')
-rw-r--r-- | tests/plugins/broadcast/broadcast_plugin_test.go | 21 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-global.yaml | 9 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml | 9 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin1.go | 27 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin2.go | 25 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin3.go | 27 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin4.go | 28 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin5.go | 28 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin6.go | 28 |
9 files changed, 128 insertions, 74 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index 2cd4b451..31e6871d 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -176,7 +176,7 @@ func TestBroadcastNoConfig(t *testing.T) { } func TestBroadcastSameSubscriber(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) cfg := &config.Viper{ @@ -189,11 +189,11 @@ func TestBroadcastSameSubscriber(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).MinTimes(1) - mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).AnyTimes() + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes() mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2) mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) @@ -278,14 +278,15 @@ func TestBroadcastSameSubscriber(t *testing.T) { t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) - time.Sleep(time.Second * 4) stopCh <- struct{}{} wg.Wait() + + time.Sleep(time.Second * 5) } func TestBroadcastSameSubscriberGlobal(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) cfg := &config.Viper{ @@ -298,11 +299,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).MinTimes(1) - mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).AnyTimes() + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes() mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2) mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) @@ -387,10 +388,10 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) - time.Sleep(time.Second * 4) stopCh <- struct{}{} wg.Wait() + time.Sleep(time.Second * 5) } func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) { diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml index 2ca97055..ea25988c 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" @@ -38,9 +36,4 @@ broadcast: logs: mode: development - level: error - -endure: - grace_period: 120s - print_graph: false - log_level: error + level: info diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml index 360e05e5..cbe18196 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" @@ -35,9 +33,4 @@ broadcast: logs: mode: development - level: debug - -endure: - grace_period: 120s - print_graph: false - log_level: error + level: info diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go index 390ba581..01ad1479 100644 --- a/tests/plugins/broadcast/plugins/plugin1.go +++ b/tests/plugins/broadcast/plugins/plugin1.go @@ -14,11 +14,14 @@ type Plugin1 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + p.exit = make(chan struct{}, 1) return nil } @@ -39,16 +42,22 @@ func (p *Plugin1) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + } } }() @@ -59,6 +68,8 @@ func (p *Plugin1) Stop() error { _ = p.driver.Unsubscribe("1", "foo") _ = p.driver.Unsubscribe("1", "foo2") _ = p.driver.Unsubscribe("1", "foo3") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go index 809020dc..ee072ffe 100644 --- a/tests/plugins/broadcast/plugins/plugin2.go +++ b/tests/plugins/broadcast/plugins/plugin2.go @@ -14,11 +14,13 @@ type Plugin2 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + exit chan struct{} } func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +40,22 @@ func (p *Plugin2) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + } } }() @@ -56,6 +64,7 @@ func (p *Plugin2) Serve() chan error { func (p *Plugin2) Stop() error { _ = p.driver.Unsubscribe("2", "foo") + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go index 4507a5b9..288201d1 100644 --- a/tests/plugins/broadcast/plugins/plugin3.go +++ b/tests/plugins/broadcast/plugins/plugin3.go @@ -14,11 +14,15 @@ type Plugin3 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin3) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + } } }() @@ -56,6 +66,7 @@ func (p *Plugin3) Serve() chan error { func (p *Plugin3) Stop() error { _ = p.driver.Unsubscribe("3", "foo") + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go index 6783855e..56f79c0f 100644 --- a/tests/plugins/broadcast/plugins/plugin4.go +++ b/tests/plugins/broadcast/plugins/plugin4.go @@ -14,11 +14,15 @@ type Plugin4 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin4) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin4) Serve() chan error { func (p *Plugin4) Stop() error { _ = p.driver.Unsubscribe("4", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go index fade6b66..e7cd7e60 100644 --- a/tests/plugins/broadcast/plugins/plugin5.go +++ b/tests/plugins/broadcast/plugins/plugin5.go @@ -14,11 +14,15 @@ type Plugin5 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin5) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin5) Serve() chan error { func (p *Plugin5) Stop() error { _ = p.driver.Unsubscribe("5", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go index d98a50b7..08272196 100644 --- a/tests/plugins/broadcast/plugins/plugin6.go +++ b/tests/plugins/broadcast/plugins/plugin6.go @@ -14,11 +14,15 @@ type Plugin6 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin6) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin6) Serve() chan error { func (p *Plugin6) Stop() error { _ = p.driver.Unsubscribe("6", "foo") + + p.exit <- struct{}{} return nil } |