diff options
author | Valery Piashchynski <[email protected]> | 2021-08-18 00:27:08 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-18 00:27:08 +0300 |
commit | 300560b44451bd9d5241ccdbaea3576760968ef2 (patch) | |
tree | 88d7d862707ae135f8c345e59111f2b2b9dff60f /tests | |
parent | 65a70f5d15fb0a1b1f787ff7f06b3a299dab0f96 (diff) |
Update broadcast tests, improve stop mechanism.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin1.go | 31 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin2.go | 31 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin3.go | 33 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin4.go | 34 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin5.go | 34 | ||||
-rw-r--r-- | tests/plugins/broadcast/plugins/plugin6.go | 34 |
6 files changed, 93 insertions, 104 deletions
diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go index 01ad1479..ed5139a8 100644 --- a/tests/plugins/broadcast/plugins/plugin1.go +++ b/tests/plugins/broadcast/plugins/plugin1.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,14 +16,14 @@ type Plugin1 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +44,16 @@ func (p *Plugin1) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } - - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + errCh <- err + return } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) } }() @@ -68,8 +64,7 @@ func (p *Plugin1) Stop() error { _ = p.driver.Unsubscribe("1", "foo") _ = p.driver.Unsubscribe("1", "foo2") _ = p.driver.Unsubscribe("1", "foo3") - - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go index ee072ffe..20cc1b24 100644 --- a/tests/plugins/broadcast/plugins/plugin2.go +++ b/tests/plugins/broadcast/plugins/plugin2.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,13 +16,14 @@ type Plugin2 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -40,22 +43,20 @@ func (p *Plugin2) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) } }() @@ -64,7 +65,7 @@ func (p *Plugin2) Serve() chan error { func (p *Plugin2) Stop() error { _ = p.driver.Unsubscribe("2", "foo") - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go index 288201d1..2f416d2e 100644 --- a/tests/plugins/broadcast/plugins/plugin3.go +++ b/tests/plugins/broadcast/plugins/plugin3.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin3 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin3) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) } }() @@ -66,7 +65,7 @@ func (p *Plugin3) Serve() chan error { func (p *Plugin3) Stop() error { _ = p.driver.Unsubscribe("3", "foo") - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go index 56f79c0f..e2209648 100644 --- a/tests/plugins/broadcast/plugins/plugin4.go +++ b/tests/plugins/broadcast/plugins/plugin4.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin4 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin4) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) } }() @@ -66,8 +65,7 @@ func (p *Plugin4) Serve() chan error { func (p *Plugin4) Stop() error { _ = p.driver.Unsubscribe("4", "foo") - - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go index e7cd7e60..122046b8 100644 --- a/tests/plugins/broadcast/plugins/plugin5.go +++ b/tests/plugins/broadcast/plugins/plugin5.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin5 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin5) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) } }() @@ -66,8 +65,7 @@ func (p *Plugin5) Serve() chan error { func (p *Plugin5) Stop() error { _ = p.driver.Unsubscribe("5", "foo") - - p.exit <- struct{}{} + p.cancel() return nil } diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go index 08272196..6ace0a79 100644 --- a/tests/plugins/broadcast/plugins/plugin6.go +++ b/tests/plugins/broadcast/plugins/plugin6.go @@ -1,8 +1,10 @@ package plugins import ( + "context" "fmt" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,15 +16,14 @@ type Plugin6 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader - - exit chan struct{} + ctx context.Context + cancel context.CancelFunc } func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b - - p.exit = make(chan struct{}, 1) + p.ctx, p.cancel = context.WithCancel(context.Background()) return nil } @@ -42,22 +43,20 @@ func (p *Plugin6) Serve() chan error { go func() { for { - select { - case <-p.exit: - return - default: - msg, err := p.driver.Next() - if err != nil { - errCh <- err + msg, err := p.driver.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } + errCh <- err + return + } - if msg == nil { - continue - } - - p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + if msg == nil { + continue } + + p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) } }() @@ -66,8 +65,7 @@ func (p *Plugin6) Serve() chan error { func (p *Plugin6) Stop() error { _ = p.driver.Unsubscribe("6", "foo") - - p.exit <- struct{}{} + p.cancel() return nil } |