diff options
Diffstat (limited to 'plugins/memory/plugin.go')
-rw-r--r-- | plugins/memory/plugin.go | 61 |
1 files changed, 35 insertions, 26 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 5efd5522..2ad041aa 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -1,9 +1,9 @@ package memory import ( - "github.com/spiral/errors" + "sync" + "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -13,28 +13,16 @@ const ( type Plugin struct { log logger.Logger - cfg *Config -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("memory_plugin_init") - - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - p.log = log - return nil -} -func (p *Plugin) Serve() chan error { - const op = errors.Op("memory_plugin_serve") - errCh := make(chan error) - - return errCh + // channel with the messages from the RPC + pushCh chan pubsub.Message + // user-subscribed topics + topics sync.Map } -func (p *Plugin) Stop() error { +func (p *Plugin) Init(log logger.Logger) error { + p.log = log + p.pushCh = make(chan pubsub.Message, 100) return nil } @@ -47,21 +35,42 @@ func (p *Plugin) Name() string { } func (p *Plugin) Publish(messages []pubsub.Message) error { - panic("implement me") + for i := 0; i < len(messages); i++ { + p.pushCh <- messages[i] + } + return nil } func (p *Plugin) PublishAsync(messages []pubsub.Message) { - panic("implement me") + go func() { + for i := 0; i < len(messages); i++ { + p.pushCh <- messages[i] + } + }() } func (p *Plugin) Subscribe(topics ...string) error { - panic("implement me") + for i := 0; i < len(topics); i++ { + p.topics.Store(topics[i], struct{}{}) + } + return nil } func (p *Plugin) Unsubscribe(topics ...string) error { - panic("implement me") + for i := 0; i < len(topics); i++ { + p.topics.Delete(topics[i]) + } + return nil } func (p *Plugin) Next() (pubsub.Message, error) { - panic("implement me") + msg := <-p.pushCh + // push only messages, which are subscribed + // TODO better??? + for i := 0; i < len(msg.Topics()); i++ { + if _, ok := p.topics.Load(msg.Topics()[i]); ok { + return msg, nil + } + } + return nil, nil } |