summaryrefslogtreecommitdiff
path: root/plugins/memory
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-08 18:41:54 +0300
committerValery Piashchynski <[email protected]>2021-06-08 18:41:54 +0300
commita8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (patch)
treee7f43f625836456104bc0c39227b71e5e3cf848a /plugins/memory
parent47c40407a7ca5f1391f4d3d504d0def166eac4e9 (diff)
- Move ws memory pub-sub plugin into the websockets folder
- Update CHANGELOG Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memory')
-rw-r--r--plugins/memory/plugin.go105
1 files changed, 0 insertions, 105 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
deleted file mode 100644
index d724dff9..00000000
--- a/plugins/memory/plugin.go
+++ /dev/null
@@ -1,105 +0,0 @@
-package memory
-
-import (
- "sync"
-
- "github.com/spiral/roadrunner/v2/pkg/bst"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
-)
-
-const (
- PluginName string = "memory"
-)
-
-type Plugin struct {
- sync.RWMutex
- log logger.Logger
-
- // channel with the messages from the RPC
- pushCh chan []byte
- // user-subscribed topics
- storage bst.Storage
-}
-
-func (p *Plugin) Init(log logger.Logger) error {
- p.log = log
- p.pushCh = make(chan []byte, 10)
- p.storage = bst.NewBST()
- return nil
-}
-
-// Available interface implementation for the plugin
-func (p *Plugin) Available() {}
-
-// Name is endure.Named interface implementation
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func (p *Plugin) Publish(message []byte) error {
- p.pushCh <- message
- return nil
-}
-
-func (p *Plugin) PublishAsync(message []byte) {
- go func() {
- p.pushCh <- message
- }()
-}
-
-func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(topics); i++ {
- p.storage.Insert(connectionID, topics[i])
- }
- return nil
-}
-
-func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(topics); i++ {
- p.storage.Remove(connectionID, topics[i])
- }
- return nil
-}
-
-func (p *Plugin) Connections(topic string, res map[string]struct{}) {
- p.RLock()
- defer p.RUnlock()
-
- ret := p.storage.Get(topic)
- for rr := range ret {
- res[rr] = struct{}{}
- }
-}
-
-func (p *Plugin) Next() (*message.Message, error) {
- msg := <-p.pushCh
- if msg == nil {
- return nil, nil
- }
-
- p.RLock()
- defer p.RUnlock()
-
- m := &message.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return nil, err
- }
-
- // push only messages, which are subscribed
- // TODO better???
- for i := 0; i < len(m.GetTopics()); i++ {
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(m.GetTopics()[i]); ok {
- return m, nil
- }
- }
- return nil, nil
-}