summaryrefslogtreecommitdiff
path: root/plugins/memory/pubsub.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/memory/pubsub.go')
-rw-r--r--plugins/memory/pubsub.go34
1 files changed, 20 insertions, 14 deletions
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index c79f3eb0..fd30eb54 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -1,8 +1,10 @@
package memory
import (
+ "context"
"sync"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/bst"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -65,21 +67,25 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- msg := <-p.pushCh
- if msg == nil {
- return nil, nil
- }
-
- p.RLock()
- defer p.RUnlock()
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("pubsub_memory")
+ select {
+ case msg := <-p.pushCh:
+ if msg == nil {
+ return nil, nil
+ }
- // push only messages, which topics are subscibed
- // TODO better???
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(msg.Topic); ok {
- return msg, nil
+ p.RLock()
+ defer p.RUnlock()
+ // push only messages, which topics are subscibed
+ // TODO better???
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(msg.Topic); ok {
+ return msg, nil
+ }
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
}
return nil, nil