summaryrefslogtreecommitdiff
path: root/plugins/redis/pubsub
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis/pubsub')
-rw-r--r--plugins/redis/pubsub/channel.go4
-rw-r--r--plugins/redis/pubsub/pubsub.go10
2 files changed, 10 insertions, 4 deletions
diff --git a/plugins/redis/pubsub/channel.go b/plugins/redis/pubsub/channel.go
index eef5a7b9..a1655ab2 100644
--- a/plugins/redis/pubsub/channel.go
+++ b/plugins/redis/pubsub/channel.go
@@ -92,6 +92,6 @@ func (r *redisChannel) stop() error {
return nil
}
-func (r *redisChannel) message() *pubsub.Message {
- return <-r.out
+func (r *redisChannel) message() chan *pubsub.Message {
+ return r.out
}
diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go
index 95a9f6dd..c9ad3d58 100644
--- a/plugins/redis/pubsub/pubsub.go
+++ b/plugins/redis/pubsub/pubsub.go
@@ -172,6 +172,12 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- return p.channel.message(), nil
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("redis_driver_next")
+ select {
+ case msg := <-p.channel.message():
+ return msg, nil
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
}