diff options
author | Valery Piashchynski <[email protected]> | 2021-08-18 00:27:08 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-18 00:27:08 +0300 |
commit | 300560b44451bd9d5241ccdbaea3576760968ef2 (patch) | |
tree | 88d7d862707ae135f8c345e59111f2b2b9dff60f /plugins/redis/pubsub/pubsub.go | |
parent | 65a70f5d15fb0a1b1f787ff7f06b3a299dab0f96 (diff) |
Update broadcast tests, improve stop mechanism.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis/pubsub/pubsub.go')
-rw-r--r-- | plugins/redis/pubsub/pubsub.go | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go index 95a9f6dd..c9ad3d58 100644 --- a/plugins/redis/pubsub/pubsub.go +++ b/plugins/redis/pubsub/pubsub.go @@ -172,6 +172,12 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *PubSubDriver) Next() (*pubsub.Message, error) { - return p.channel.message(), nil +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("redis_driver_next") + select { + case msg := <-p.channel.message(): + return msg, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } } |