diff options
author | Valery Piashchynski <[email protected]> | 2021-06-01 00:10:31 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-01 00:10:31 +0300 |
commit | 548ee4432e48b316ada00feec1a6b89e67ae4f2f (patch) | |
tree | 5cd2aaeeafdb50e3e46824197c721223f54695bf /plugins/memory/plugin.go | |
parent | 8cd696bbca8fac2ced30d8172c41b7434ec86650 (diff) | |
parent | df4d316d519cea6dff654bd917521a616a37f769 (diff) |
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
Diffstat (limited to 'plugins/memory/plugin.go')
-rw-r--r-- | plugins/memory/plugin.go | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go new file mode 100644 index 00000000..49c187bc --- /dev/null +++ b/plugins/memory/plugin.go @@ -0,0 +1,81 @@ +package memory + +import ( + "sync" + + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "memory" +) + +type Plugin struct { + log logger.Logger + + // channel with the messages from the RPC + pushCh chan *pubsub.Message + // user-subscribed topics + topics sync.Map +} + +func (p *Plugin) Init(log logger.Logger) error { + p.log = log + p.pushCh = make(chan *pubsub.Message, 100) + return nil +} + +// Available interface implementation for the plugin +func (p *Plugin) Available() {} + +// Name is endure.Named interface implementation +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Publish(messages []*pubsub.Message) error { + for i := 0; i < len(messages); i++ { + p.pushCh <- messages[i] + } + return nil +} + +func (p *Plugin) PublishAsync(messages []*pubsub.Message) { + go func() { + for i := 0; i < len(messages); i++ { + p.pushCh <- messages[i] + } + }() +} + +func (p *Plugin) Subscribe(topics ...string) error { + for i := 0; i < len(topics); i++ { + p.topics.Store(topics[i], struct{}{}) + } + return nil +} + +func (p *Plugin) Unsubscribe(topics ...string) error { + for i := 0; i < len(topics); i++ { + p.topics.Delete(topics[i]) + } + return nil +} + +func (p *Plugin) Next() (*pubsub.Message, error) { + msg := <-p.pushCh + + if msg == nil { + return nil, nil + } + + // push only messages, which are subscribed + // TODO better??? + for i := 0; i < len(msg.Topics); i++ { + if _, ok := p.topics.Load(msg.Topics[i]); ok { + return msg, nil + } + } + return nil, nil +} |