diff options
author | Valery Piashchynski <[email protected]> | 2021-06-20 16:40:14 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-20 16:40:14 +0300 |
commit | 2dd30155de6faaf6005027d5337a840310c827f9 (patch) | |
tree | aa6f0ce2d2db2047b7e729b16dd70d721f4bae55 /plugins/memory | |
parent | 25dfc0d837827d0d1c729d323dd651ca6163fe09 (diff) |
- Update redis/memory pubsubs
- Rework internal message bus
- Add new tests for the broadcast plugin and include them into the GA
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memory')
-rw-r--r-- | plugins/memory/pubsub.go | 35 |
1 files changed, 13 insertions, 22 deletions
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index 87638bd8..d027a8a5 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -6,14 +6,12 @@ import ( "github.com/spiral/roadrunner/v2/pkg/bst" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" - websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" - "google.golang.org/protobuf/proto" ) type PubSubDriver struct { sync.RWMutex // channel with the messages from the RPC - pushCh chan []byte + pushCh chan *pubsub.Message // user-subscribed topics storage bst.Storage log logger.Logger @@ -21,21 +19,21 @@ type PubSubDriver struct { func NewPubSubDriver(log logger.Logger, _ string) (pubsub.PubSub, error) { ps := &PubSubDriver{ - pushCh: make(chan []byte, 10), + pushCh: make(chan *pubsub.Message, 10), storage: bst.NewBST(), log: log, } return ps, nil } -func (p *PubSubDriver) Publish(message []byte) error { - p.pushCh <- message +func (p *PubSubDriver) Publish(msg *pubsub.Message) error { + p.pushCh <- msg return nil } -func (p *PubSubDriver) PublishAsync(message []byte) { +func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { go func() { - p.pushCh <- message + p.pushCh <- msg }() } @@ -67,7 +65,7 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } } -func (p *PubSubDriver) Next() (*websocketsv1.Message, error) { +func (p *PubSubDriver) Next() (*pubsub.Message, error) { msg := <-p.pushCh if msg == nil { return nil, nil @@ -76,20 +74,13 @@ func (p *PubSubDriver) Next() (*websocketsv1.Message, error) { p.RLock() defer p.RUnlock() - m := &websocketsv1.Message{} - err := proto.Unmarshal(msg, m) - if err != nil { - return nil, err - } - - // push only messages, which are subscribed + // push only messages, which topics are subscibed // TODO better??? - for i := 0; i < len(m.GetTopics()); i++ { - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(m.GetTopics()[i]); ok { - return m, nil - } + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(msg.Topic); ok { + return msg, nil } + return nil, nil } |