summaryrefslogtreecommitdiff
path: root/plugins/memory/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/memory/plugin.go')
-rw-r--r--plugins/memory/plugin.go61
1 files changed, 35 insertions, 26 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 5efd5522..2ad041aa 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -1,9 +1,9 @@
package memory
import (
- "github.com/spiral/errors"
+ "sync"
+
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -13,28 +13,16 @@ const (
type Plugin struct {
log logger.Logger
- cfg *Config
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("memory_plugin_init")
-
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- p.log = log
- return nil
-}
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("memory_plugin_serve")
- errCh := make(chan error)
-
- return errCh
+ // channel with the messages from the RPC
+ pushCh chan pubsub.Message
+ // user-subscribed topics
+ topics sync.Map
}
-func (p *Plugin) Stop() error {
+func (p *Plugin) Init(log logger.Logger) error {
+ p.log = log
+ p.pushCh = make(chan pubsub.Message, 100)
return nil
}
@@ -47,21 +35,42 @@ func (p *Plugin) Name() string {
}
func (p *Plugin) Publish(messages []pubsub.Message) error {
- panic("implement me")
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ return nil
}
func (p *Plugin) PublishAsync(messages []pubsub.Message) {
- panic("implement me")
+ go func() {
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ }()
}
func (p *Plugin) Subscribe(topics ...string) error {
- panic("implement me")
+ for i := 0; i < len(topics); i++ {
+ p.topics.Store(topics[i], struct{}{})
+ }
+ return nil
}
func (p *Plugin) Unsubscribe(topics ...string) error {
- panic("implement me")
+ for i := 0; i < len(topics); i++ {
+ p.topics.Delete(topics[i])
+ }
+ return nil
}
func (p *Plugin) Next() (pubsub.Message, error) {
- panic("implement me")
+ msg := <-p.pushCh
+ // push only messages, which are subscribed
+ // TODO better???
+ for i := 0; i < len(msg.Topics()); i++ {
+ if _, ok := p.topics.Load(msg.Topics()[i]); ok {
+ return msg, nil
+ }
+ }
+ return nil, nil
}