diff options
Diffstat (limited to 'plugins/redis/pubsub')
-rw-r--r-- | plugins/redis/pubsub/channel.go | 4 | ||||
-rw-r--r-- | plugins/redis/pubsub/pubsub.go | 10 |
2 files changed, 10 insertions, 4 deletions
diff --git a/plugins/redis/pubsub/channel.go b/plugins/redis/pubsub/channel.go index eef5a7b9..a1655ab2 100644 --- a/plugins/redis/pubsub/channel.go +++ b/plugins/redis/pubsub/channel.go @@ -92,6 +92,6 @@ func (r *redisChannel) stop() error { return nil } -func (r *redisChannel) message() *pubsub.Message { - return <-r.out +func (r *redisChannel) message() chan *pubsub.Message { + return r.out } diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go index 95a9f6dd..c9ad3d58 100644 --- a/plugins/redis/pubsub/pubsub.go +++ b/plugins/redis/pubsub/pubsub.go @@ -172,6 +172,12 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *PubSubDriver) Next() (*pubsub.Message, error) { - return p.channel.message(), nil +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("redis_driver_next") + select { + case msg := <-p.channel.message(): + return msg, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } } |