summaryrefslogtreecommitdiff
path: root/plugins/memory
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
committerValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
commitd0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch)
tree7e6ec1a320f596b31f205caee5d5753eaa42f4ff /plugins/memory
parent0323e070103cc2c30d2cdfb12719d753acafe151 (diff)
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memory')
-rw-r--r--plugins/memory/plugin.go45
1 files changed, 33 insertions, 12 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index eb87b39e..6732ff5d 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -3,6 +3,7 @@ package memory
import (
"sync"
+ "github.com/spiral/roadrunner/v2/pkg/bst"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
@@ -13,17 +14,19 @@ const (
)
type Plugin struct {
+ sync.RWMutex
log logger.Logger
// channel with the messages from the RPC
pushCh chan []byte
// user-subscribed topics
- topics sync.Map
+ storage bst.Storage
}
func (p *Plugin) Init(log logger.Logger) error {
p.log = log
- p.pushCh = make(chan []byte, 100)
+ p.pushCh = make(chan []byte, 10)
+ p.storage = bst.NewBST()
return nil
}
@@ -35,44 +38,62 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) Publish(messages []byte) error {
- p.pushCh <- messages
+func (p *Plugin) Publish(message []byte) error {
+ p.pushCh <- message
return nil
}
-func (p *Plugin) PublishAsync(messages []byte) {
+func (p *Plugin) PublishAsync(message []byte) {
go func() {
- p.pushCh <- messages
+ p.pushCh <- message
}()
}
-func (p *Plugin) Subscribe(topics ...string) error {
+func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
+ p.Lock()
+ defer p.Unlock()
for i := 0; i < len(topics); i++ {
- p.topics.Store(topics[i], struct{}{})
+ p.storage.Insert(connectionID, topics[i])
}
return nil
}
-func (p *Plugin) Unsubscribe(topics ...string) error {
+func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
+ p.Lock()
+ defer p.Unlock()
for i := 0; i < len(topics); i++ {
- p.topics.Delete(topics[i])
+ p.storage.Remove(connectionID, topics[i])
}
return nil
}
+func (p *Plugin) Connections(topic string, res map[string]struct{}) {
+ p.RLock()
+ defer p.RUnlock()
+
+ ret := p.storage.Get(topic)
+ for rr := range ret {
+ res[rr] = struct{}{}
+ }
+}
+
func (p *Plugin) Next() (*message.Message, error) {
msg := <-p.pushCh
-
if msg == nil {
return nil, nil
}
+ p.RLock()
+ defer p.RUnlock()
+
fbsMsg := message.GetRootAsMessage(msg, 0)
// push only messages, which are subscribed
// TODO better???
for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if _, ok := p.topics.Load(utils.AsString(fbsMsg.Topics(i))); ok {
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok {
return fbsMsg, nil
}
}