summaryrefslogtreecommitdiff
path: root/plugins/memory
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-20 16:40:14 +0300
committerValery Piashchynski <[email protected]>2021-06-20 16:40:14 +0300
commit2dd30155de6faaf6005027d5337a840310c827f9 (patch)
treeaa6f0ce2d2db2047b7e729b16dd70d721f4bae55 /plugins/memory
parent25dfc0d837827d0d1c729d323dd651ca6163fe09 (diff)
- Update redis/memory pubsubs
- Rework internal message bus - Add new tests for the broadcast plugin and include them into the GA Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memory')
-rw-r--r--plugins/memory/pubsub.go35
1 files changed, 13 insertions, 22 deletions
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index 87638bd8..d027a8a5 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -6,14 +6,12 @@ import (
"github.com/spiral/roadrunner/v2/pkg/bst"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
- websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
- "google.golang.org/protobuf/proto"
)
type PubSubDriver struct {
sync.RWMutex
// channel with the messages from the RPC
- pushCh chan []byte
+ pushCh chan *pubsub.Message
// user-subscribed topics
storage bst.Storage
log logger.Logger
@@ -21,21 +19,21 @@ type PubSubDriver struct {
func NewPubSubDriver(log logger.Logger, _ string) (pubsub.PubSub, error) {
ps := &PubSubDriver{
- pushCh: make(chan []byte, 10),
+ pushCh: make(chan *pubsub.Message, 10),
storage: bst.NewBST(),
log: log,
}
return ps, nil
}
-func (p *PubSubDriver) Publish(message []byte) error {
- p.pushCh <- message
+func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
+ p.pushCh <- msg
return nil
}
-func (p *PubSubDriver) PublishAsync(message []byte) {
+func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
go func() {
- p.pushCh <- message
+ p.pushCh <- msg
}()
}
@@ -67,7 +65,7 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
+func (p *PubSubDriver) Next() (*pubsub.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
@@ -76,20 +74,13 @@ func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
p.RLock()
defer p.RUnlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return nil, err
- }
-
- // push only messages, which are subscribed
+ // push only messages, which topics are subscibed
// TODO better???
- for i := 0; i < len(m.GetTopics()); i++ {
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(m.GetTopics()[i]); ok {
- return m, nil
- }
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(msg.Topic); ok {
+ return msg, nil
}
+
return nil, nil
}