diff options
Diffstat (limited to 'plugins/broadcast/memory')
-rw-r--r-- | plugins/broadcast/memory/config.go | 6 | ||||
-rw-r--r-- | plugins/broadcast/memory/driver.go | 42 | ||||
-rw-r--r-- | plugins/broadcast/memory/memory.go | 131 | ||||
-rw-r--r-- | plugins/broadcast/memory/memory_test.go | 80 | ||||
-rw-r--r-- | plugins/broadcast/memory/plugin.go | 71 |
5 files changed, 119 insertions, 211 deletions
diff --git a/plugins/broadcast/memory/config.go b/plugins/broadcast/memory/config.go new file mode 100644 index 00000000..840dbb96 --- /dev/null +++ b/plugins/broadcast/memory/config.go @@ -0,0 +1,6 @@ +package memory + +// Config for the memory driver is empty, it's just a placeholder +type Config struct {} + +func (c *Config) InitDefaults() {} diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go new file mode 100644 index 00000000..8a9374c9 --- /dev/null +++ b/plugins/broadcast/memory/driver.go @@ -0,0 +1,42 @@ +package memory + +import "github.com/spiral/roadrunner/v2/plugins/broadcast" + +type Driver struct { + +} + +func NewInMemoryDriver() broadcast.Broker { + b := &Driver{ + + } + return b +} + +func (d *Driver) Serve() error { + panic("implement me") +} + +func (d *Driver) Stop() { + panic("implement me") +} + +func (d *Driver) Subscribe(upstream chan *broadcast.Message, topics ...string) error { + panic("implement me") +} + +func (d *Driver) SubscribePattern(upstream chan *broadcast.Message, pattern string) error { + panic("implement me") +} + +func (d *Driver) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error { + panic("implement me") +} + +func (d *Driver) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error { + panic("implement me") +} + +func (d *Driver) Publish(messages ...*broadcast.Message) error { + panic("implement me") +} diff --git a/plugins/broadcast/memory/memory.go b/plugins/broadcast/memory/memory.go deleted file mode 100644 index 5b85d68f..00000000 --- a/plugins/broadcast/memory/memory.go +++ /dev/null @@ -1,131 +0,0 @@ -package memory - -import ( - "errors" - "sync/atomic" -) - -// Memory manages broadcasting in memory. -type Memory struct { - router *Router - messages chan *Message - join, leave chan subscriber - stop chan interface{} - stopped int32 -} - -// memoryBroker creates new memory based message broker. -func memoryBroker() *Memory { - return &Memory{ - router: NewRouter(), - messages: make(chan *Message), - join: make(chan subscriber), - leave: make(chan subscriber), - stop: make(chan interface{}), - stopped: 0, - } -} - -// Serve serves broker. -func (m *Memory) Serve() error { - for { - select { - case ctx := <-m.join: - ctx.done <- m.handleJoin(ctx) - case ctx := <-m.leave: - ctx.done <- m.handleLeave(ctx) - case msg := <-m.messages: - m.router.Dispatch(msg) - case <-m.stop: - return nil - } - } -} - -func (m *Memory) handleJoin(sub subscriber) (err error) { - if sub.pattern != "" { - _, err = m.router.SubscribePattern(sub.upstream, sub.pattern) - return err - } - - m.router.Subscribe(sub.upstream, sub.topics...) - return nil -} - -func (m *Memory) handleLeave(sub subscriber) error { - if sub.pattern != "" { - m.router.UnsubscribePattern(sub.upstream, sub.pattern) - return nil - } - - m.router.Unsubscribe(sub.upstream, sub.topics...) - return nil -} - -// Stop closes the consumption and disconnects broker. -func (m *Memory) Stop() { - if atomic.CompareAndSwapInt32(&m.stopped, 0, 1) { - close(m.stop) - } -} - -// Subscribe broker to one or multiple channels. -func (m *Memory) Subscribe(upstream chan *Message, topics ...string) error { - if atomic.LoadInt32(&m.stopped) == 1 { - return errors.New("broker has been stopped") - } - - ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)} - - m.join <- ctx - return <-ctx.done -} - -// SubscribePattern broker to pattern. -func (m *Memory) SubscribePattern(upstream chan *Message, pattern string) error { - if atomic.LoadInt32(&m.stopped) == 1 { - return errors.New("broker has been stopped") - } - - ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)} - - m.join <- ctx - return <-ctx.done -} - -// Unsubscribe broker from one or multiple channels. -func (m *Memory) Unsubscribe(upstream chan *Message, topics ...string) error { - if atomic.LoadInt32(&m.stopped) == 1 { - return errors.New("broker has been stopped") - } - - ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)} - - m.leave <- ctx - return <-ctx.done -} - -// UnsubscribePattern broker from pattern. -func (m *Memory) UnsubscribePattern(upstream chan *Message, pattern string) error { - if atomic.LoadInt32(&m.stopped) == 1 { - return errors.New("broker has been stopped") - } - - ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)} - - m.leave <- ctx - return <-ctx.done -} - -// Publish one or multiple Channel. -func (m *Memory) Publish(messages ...*Message) error { - if atomic.LoadInt32(&m.stopped) == 1 { - return errors.New("broker has been stopped") - } - - for _, msg := range messages { - m.messages <- msg - } - - return nil -} diff --git a/plugins/broadcast/memory/memory_test.go b/plugins/broadcast/memory/memory_test.go deleted file mode 100644 index 0eb8d03e..00000000 --- a/plugins/broadcast/memory/memory_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package memory - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMemory_Broadcast(t *testing.T) { - br, _, c := setup(`{}`) - defer c.Stop() - - client := br.NewClient() - defer client.Close() - - assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) // must not be delivered - - assert.NoError(t, client.Subscribe("topic")) - - assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) - assert.Equal(t, `hello1`, readStr(<-client.Channel())) - - assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello2"))) - assert.Equal(t, `hello2`, readStr(<-client.Channel())) - - assert.NoError(t, client.Unsubscribe("topic")) - - assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello3"))) - - assert.NoError(t, client.Subscribe("topic")) - - assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello4"))) - assert.Equal(t, `hello4`, readStr(<-client.Channel())) -} - -func TestMemory_BroadcastPattern(t *testing.T) { - br, _, c := setup(`{}`) - defer c.Stop() - - client := br.NewClient() - defer client.Close() - - assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) // must not be delivered - - assert.NoError(t, client.SubscribePattern("topic/*")) - - assert.NoError(t, br.Broker().Publish(newMessage("topic/1", "hello1"))) - assert.Equal(t, `hello1`, readStr(<-client.Channel())) - - assert.NoError(t, client.Publish(newMessage("topic/1", "hello1"))) - assert.Equal(t, `hello1`, readStr(<-client.Channel())) - - assert.NoError(t, br.Broker().Publish(newMessage("topic/2", "hello2"))) - assert.Equal(t, `hello2`, readStr(<-client.Channel())) - - assert.NoError(t, br.Broker().Publish(newMessage("different", "hello4"))) - assert.NoError(t, br.Broker().Publish(newMessage("topic/2", "hello5"))) - - assert.Equal(t, `hello5`, readStr(<-client.Channel())) - - assert.NoError(t, client.UnsubscribePattern("topic/*")) - - assert.NoError(t, br.Broker().Publish(newMessage("topic/3", "hello6"))) - - assert.NoError(t, client.SubscribePattern("topic/*")) - - assert.NoError(t, br.Broker().Publish(newMessage("topic/4", "hello7"))) - assert.Equal(t, `hello7`, readStr(<-client.Channel())) -} - -func TestMemory_NotActive(t *testing.T) { - b := memoryBroker() - b.stopped = 1 - - assert.Error(t, b.Publish(nil)) - assert.Error(t, b.Subscribe(nil)) - assert.Error(t, b.Unsubscribe(nil)) - assert.Error(t, b.SubscribePattern(nil, "")) - assert.Error(t, b.UnsubscribePattern(nil, "")) -} diff --git a/plugins/broadcast/memory/plugin.go b/plugins/broadcast/memory/plugin.go new file mode 100644 index 00000000..4ebeb4c8 --- /dev/null +++ b/plugins/broadcast/memory/plugin.go @@ -0,0 +1,71 @@ +package memory + +import ( + "fmt" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "broadcast" + SectionName string = "memory" +) + +type Plugin struct { + log logger.Logger + cfg *Config + +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("memory_plugin_init") + + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + if !cfg.Has(fmt.Sprintf("%s.%s", PluginName, SectionName)) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, errors.Disabled, err) + } + + p.cfg.InitDefaults() + + p.log = log + return nil +} + +func (p *Plugin) Serve() chan error { + const op = errors.Op("memory_plugin_serve") + errCh := make(chan error) + + + return errCh +} + +func (p *Plugin) Stop() error { + + return nil +} + +// Available interface implementation for the plugin +func (p *Plugin) Available() {} + +// Name is endure.Named interface implementation +func (p *Plugin) Name() string { + // broadcast.memory + return fmt.Sprintf("%s.%s", PluginName, SectionName) +} + + + +func (p *Plugin) Publish(msg []*broadcast.Message) error { + return nil +} |