summaryrefslogtreecommitdiff
path: root/plugins/memory/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-01 00:10:31 +0300
committerGitHub <[email protected]>2021-06-01 00:10:31 +0300
commit548ee4432e48b316ada00feec1a6b89e67ae4f2f (patch)
tree5cd2aaeeafdb50e3e46824197c721223f54695bf /plugins/memory/plugin.go
parent8cd696bbca8fac2ced30d8172c41b7434ec86650 (diff)
parentdf4d316d519cea6dff654bd917521a616a37f769 (diff)
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
Diffstat (limited to 'plugins/memory/plugin.go')
-rw-r--r--plugins/memory/plugin.go81
1 files changed, 81 insertions, 0 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
new file mode 100644
index 00000000..49c187bc
--- /dev/null
+++ b/plugins/memory/plugin.go
@@ -0,0 +1,81 @@
+package memory
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "memory"
+)
+
+type Plugin struct {
+ log logger.Logger
+
+ // channel with the messages from the RPC
+ pushCh chan *pubsub.Message
+ // user-subscribed topics
+ topics sync.Map
+}
+
+func (p *Plugin) Init(log logger.Logger) error {
+ p.log = log
+ p.pushCh = make(chan *pubsub.Message, 100)
+ return nil
+}
+
+// Available interface implementation for the plugin
+func (p *Plugin) Available() {}
+
+// Name is endure.Named interface implementation
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Publish(messages []*pubsub.Message) error {
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(messages []*pubsub.Message) {
+ go func() {
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ }()
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ for i := 0; i < len(topics); i++ {
+ p.topics.Store(topics[i], struct{}{})
+ }
+ return nil
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ for i := 0; i < len(topics); i++ {
+ p.topics.Delete(topics[i])
+ }
+ return nil
+}
+
+func (p *Plugin) Next() (*pubsub.Message, error) {
+ msg := <-p.pushCh
+
+ if msg == nil {
+ return nil, nil
+ }
+
+ // push only messages, which are subscribed
+ // TODO better???
+ for i := 0; i < len(msg.Topics); i++ {
+ if _, ok := p.topics.Load(msg.Topics[i]); ok {
+ return msg, nil
+ }
+ }
+ return nil, nil
+}