summaryrefslogtreecommitdiff
path: root/plugins/memory
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-18 00:27:08 +0300
committerValery Piashchynski <[email protected]>2021-08-18 00:27:08 +0300
commit300560b44451bd9d5241ccdbaea3576760968ef2 (patch)
tree88d7d862707ae135f8c345e59111f2b2b9dff60f /plugins/memory
parent65a70f5d15fb0a1b1f787ff7f06b3a299dab0f96 (diff)
Update broadcast tests, improve stop mechanism.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memory')
-rw-r--r--plugins/memory/pubsub.go34
1 files changed, 20 insertions, 14 deletions
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index c79f3eb0..fd30eb54 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -1,8 +1,10 @@
package memory
import (
+ "context"
"sync"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/bst"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -65,21 +67,25 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- msg := <-p.pushCh
- if msg == nil {
- return nil, nil
- }
-
- p.RLock()
- defer p.RUnlock()
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("pubsub_memory")
+ select {
+ case msg := <-p.pushCh:
+ if msg == nil {
+ return nil, nil
+ }
- // push only messages, which topics are subscibed
- // TODO better???
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(msg.Topic); ok {
- return msg, nil
+ p.RLock()
+ defer p.RUnlock()
+ // push only messages, which topics are subscibed
+ // TODO better???
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(msg.Topic); ok {
+ return msg, nil
+ }
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
}
return nil, nil