summaryrefslogtreecommitdiff
path: root/plugins/redis/pubsub/pubsub.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-18 00:27:08 +0300
committerValery Piashchynski <[email protected]>2021-08-18 00:27:08 +0300
commit300560b44451bd9d5241ccdbaea3576760968ef2 (patch)
tree88d7d862707ae135f8c345e59111f2b2b9dff60f /plugins/redis/pubsub/pubsub.go
parent65a70f5d15fb0a1b1f787ff7f06b3a299dab0f96 (diff)
Update broadcast tests, improve stop mechanism.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis/pubsub/pubsub.go')
-rw-r--r--plugins/redis/pubsub/pubsub.go10
1 files changed, 8 insertions, 2 deletions
diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go
index 95a9f6dd..c9ad3d58 100644
--- a/plugins/redis/pubsub/pubsub.go
+++ b/plugins/redis/pubsub/pubsub.go
@@ -172,6 +172,12 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- return p.channel.message(), nil
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("redis_driver_next")
+ select {
+ case msg := <-p.channel.message():
+ return msg, nil
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
}