diff options
author | Valery Piashchynski <[email protected]> | 2021-06-05 13:21:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-05 13:21:35 +0300 |
commit | d0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch) | |
tree | 7e6ec1a320f596b31f205caee5d5753eaa42f4ff /plugins/memory | |
parent | 0323e070103cc2c30d2cdfb12719d753acafe151 (diff) |
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memory')
-rw-r--r-- | plugins/memory/plugin.go | 45 |
1 files changed, 33 insertions, 12 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index eb87b39e..6732ff5d 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -3,6 +3,7 @@ package memory import ( "sync" + "github.com/spiral/roadrunner/v2/pkg/bst" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" @@ -13,17 +14,19 @@ const ( ) type Plugin struct { + sync.RWMutex log logger.Logger // channel with the messages from the RPC pushCh chan []byte // user-subscribed topics - topics sync.Map + storage bst.Storage } func (p *Plugin) Init(log logger.Logger) error { p.log = log - p.pushCh = make(chan []byte, 100) + p.pushCh = make(chan []byte, 10) + p.storage = bst.NewBST() return nil } @@ -35,44 +38,62 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Publish(messages []byte) error { - p.pushCh <- messages +func (p *Plugin) Publish(message []byte) error { + p.pushCh <- message return nil } -func (p *Plugin) PublishAsync(messages []byte) { +func (p *Plugin) PublishAsync(message []byte) { go func() { - p.pushCh <- messages + p.pushCh <- message }() } -func (p *Plugin) Subscribe(topics ...string) error { +func (p *Plugin) Subscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() for i := 0; i < len(topics); i++ { - p.topics.Store(topics[i], struct{}{}) + p.storage.Insert(connectionID, topics[i]) } return nil } -func (p *Plugin) Unsubscribe(topics ...string) error { +func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() for i := 0; i < len(topics); i++ { - p.topics.Delete(topics[i]) + p.storage.Remove(connectionID, topics[i]) } return nil } +func (p *Plugin) Connections(topic string, res map[string]struct{}) { + p.RLock() + defer p.RUnlock() + + ret := p.storage.Get(topic) + for rr := range ret { + res[rr] = struct{}{} + } +} + func (p *Plugin) Next() (*message.Message, error) { msg := <-p.pushCh - if msg == nil { return nil, nil } + p.RLock() + defer p.RUnlock() + fbsMsg := message.GetRootAsMessage(msg, 0) // push only messages, which are subscribed // TODO better??? for i := 0; i < fbsMsg.TopicsLength(); i++ { - if _, ok := p.topics.Load(utils.AsString(fbsMsg.Topics(i))); ok { + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok { return fbsMsg, nil } } |