diff options
Diffstat (limited to 'plugins/memory/pubsub.go')
-rw-r--r-- | plugins/memory/pubsub.go | 34 |
1 files changed, 20 insertions, 14 deletions
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index c79f3eb0..fd30eb54 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -1,8 +1,10 @@ package memory import ( + "context" "sync" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/bst" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -65,21 +67,25 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } } -func (p *PubSubDriver) Next() (*pubsub.Message, error) { - msg := <-p.pushCh - if msg == nil { - return nil, nil - } - - p.RLock() - defer p.RUnlock() +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("pubsub_memory") + select { + case msg := <-p.pushCh: + if msg == nil { + return nil, nil + } - // push only messages, which topics are subscibed - // TODO better??? - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(msg.Topic); ok { - return msg, nil + p.RLock() + defer p.RUnlock() + // push only messages, which topics are subscibed + // TODO better??? + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(msg.Topic); ok { + return msg, nil + } + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) } return nil, nil |