summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-18 00:27:08 +0300
committerValery Piashchynski <[email protected]>2021-08-18 00:27:08 +0300
commit300560b44451bd9d5241ccdbaea3576760968ef2 (patch)
tree88d7d862707ae135f8c345e59111f2b2b9dff60f /tests
parent65a70f5d15fb0a1b1f787ff7f06b3a299dab0f96 (diff)
Update broadcast tests, improve stop mechanism.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests')
-rw-r--r--tests/plugins/broadcast/plugins/plugin1.go31
-rw-r--r--tests/plugins/broadcast/plugins/plugin2.go31
-rw-r--r--tests/plugins/broadcast/plugins/plugin3.go33
-rw-r--r--tests/plugins/broadcast/plugins/plugin4.go34
-rw-r--r--tests/plugins/broadcast/plugins/plugin5.go34
-rw-r--r--tests/plugins/broadcast/plugins/plugin6.go34
6 files changed, 93 insertions, 104 deletions
diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go
index 01ad1479..ed5139a8 100644
--- a/tests/plugins/broadcast/plugins/plugin1.go
+++ b/tests/plugins/broadcast/plugins/plugin1.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,14 +16,14 @@ type Plugin1 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +44,16 @@ func (p *Plugin1) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
-
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
+ errCh <- err
+ return
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
}
}()
@@ -68,8 +64,7 @@ func (p *Plugin1) Stop() error {
_ = p.driver.Unsubscribe("1", "foo")
_ = p.driver.Unsubscribe("1", "foo2")
_ = p.driver.Unsubscribe("1", "foo3")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go
index ee072ffe..20cc1b24 100644
--- a/tests/plugins/broadcast/plugins/plugin2.go
+++ b/tests/plugins/broadcast/plugins/plugin2.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,13 +16,14 @@ type Plugin2 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -40,22 +43,20 @@ func (p *Plugin2) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
}
}()
@@ -64,7 +65,7 @@ func (p *Plugin2) Serve() chan error {
func (p *Plugin2) Stop() error {
_ = p.driver.Unsubscribe("2", "foo")
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go
index 288201d1..2f416d2e 100644
--- a/tests/plugins/broadcast/plugins/plugin3.go
+++ b/tests/plugins/broadcast/plugins/plugin3.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin3 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin3) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
}
}()
@@ -66,7 +65,7 @@ func (p *Plugin3) Serve() chan error {
func (p *Plugin3) Stop() error {
_ = p.driver.Unsubscribe("3", "foo")
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go
index 56f79c0f..e2209648 100644
--- a/tests/plugins/broadcast/plugins/plugin4.go
+++ b/tests/plugins/broadcast/plugins/plugin4.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin4 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin4) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
}
}()
@@ -66,8 +65,7 @@ func (p *Plugin4) Serve() chan error {
func (p *Plugin4) Stop() error {
_ = p.driver.Unsubscribe("4", "foo")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go
index e7cd7e60..122046b8 100644
--- a/tests/plugins/broadcast/plugins/plugin5.go
+++ b/tests/plugins/broadcast/plugins/plugin5.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin5 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin5) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
}
}()
@@ -66,8 +65,7 @@ func (p *Plugin5) Serve() chan error {
func (p *Plugin5) Stop() error {
_ = p.driver.Unsubscribe("5", "foo")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go
index 08272196..6ace0a79 100644
--- a/tests/plugins/broadcast/plugins/plugin6.go
+++ b/tests/plugins/broadcast/plugins/plugin6.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin6 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin6) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
}
}()
@@ -66,8 +65,7 @@ func (p *Plugin6) Serve() chan error {
func (p *Plugin6) Stop() error {
_ = p.driver.Unsubscribe("6", "foo")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}