diff options
author | Valery Piashchynski <[email protected]> | 2021-05-27 13:26:12 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-27 13:26:12 +0300 |
commit | 0a9aea326045e56716f0736f7aa8520305362c51 (patch) | |
tree | 532ca326690d81e97136248dd798d23a56843278 /plugins/memory | |
parent | 57a30c2b49c36161b3af3e539a8618c2d39a5cc9 (diff) |
- Move bst to the pkg folder
- Add comments
- Fix all golangci-lint warnings
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memory')
-rw-r--r-- | plugins/memory/bst/bst.go | 136 | ||||
-rw-r--r-- | plugins/memory/bst/bst_test.go | 37 | ||||
-rw-r--r-- | plugins/memory/bst/interface.go | 11 | ||||
-rw-r--r-- | plugins/memory/config.go | 8 | ||||
-rw-r--r-- | plugins/memory/driver.go | 28 | ||||
-rw-r--r-- | plugins/memory/plugin.go | 61 |
6 files changed, 35 insertions, 246 deletions
diff --git a/plugins/memory/bst/bst.go b/plugins/memory/bst/bst.go deleted file mode 100644 index 3060ff11..00000000 --- a/plugins/memory/bst/bst.go +++ /dev/null @@ -1,136 +0,0 @@ -package bst - -// BST ... -type BST struct { - // registered topic, not unique - topic string - // associated connections with the topic - uuids map[string]struct{} - - // left and right subtrees - left *BST - right *BST -} - -func NewBST() Storage { - return &BST{} -} - -// Insert uuid to the topic -func (b *BST) Insert(uuid string, topic string) { - curr := b - - for { - if curr.topic == topic { - curr.uuids[uuid] = struct{}{} - return - } - // if topic less than curr topic - if curr.topic < topic { - if curr.left == nil { - curr.left = &BST{ - topic: topic, - uuids: map[string]struct{}{uuid: {}}, - } - return - } - // move forward - curr = curr.left - } else { - if curr.right == nil { - curr.right = &BST{ - topic: topic, - uuids: map[string]struct{}{uuid: {}}, - } - return - } - - curr = curr.right - } - } -} - -func (b *BST) Get(topic string) map[string]struct{} { - curr := b - for curr != nil { - if curr.topic == topic { - return curr.uuids - } - if curr.topic < topic { - curr = curr.left - continue - } - if curr.topic > topic { - curr = curr.right - continue - } - } - - return nil -} - -func (b *BST) Remove(uuid string, topic string) { - b.removeHelper(uuid, topic, nil) -} - -func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit - curr := b - for curr != nil { - if topic < curr.topic { - parent = curr - curr = curr.left - } else if topic > curr.topic { - parent = curr - curr = curr.right - } else { - if len(curr.uuids) > 1 { - if _, ok := curr.uuids[uuid]; ok { - delete(curr.uuids, uuid) - return - } - } - - if curr.left != nil && curr.right != nil { - curr.topic, curr.uuids = curr.right.traverseForMinString() - curr.right.removeHelper(curr.topic, uuid, curr) - } else if parent == nil { - if curr.left != nil { - curr.topic = curr.left.topic - curr.uuids = curr.left.uuids - - curr.right = curr.left.right - curr.left = curr.left.left - } else if curr.right != nil { - curr.topic = curr.right.topic - curr.uuids = curr.right.uuids - - curr.left = curr.right.left - curr.right = curr.right.right - } else { - // single node tree - } - } else if parent.left == curr { - if curr.left != nil { - parent.left = curr.left - } else { - parent.left = curr.right - } - } else if parent.right == curr { - if curr.left != nil { - parent.right = curr.left - } else { - parent.right = curr.right - } - } - break - } - } -} - -//go:inline -func (b *BST) traverseForMinString() (string, map[string]struct{}) { - if b.left == nil { - return b.topic, b.uuids - } - return b.left.traverseForMinString() -} diff --git a/plugins/memory/bst/bst_test.go b/plugins/memory/bst/bst_test.go deleted file mode 100644 index e8a13760..00000000 --- a/plugins/memory/bst/bst_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package bst - -import ( - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" -) - -func TestNewBST(t *testing.T) { - // create a new bst - g := NewBST() - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments") - } - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments2") - } - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments3") - } - - // should be 100 - exist := g.Get("comments") - assert.Len(t, exist, 100) - - // should be 100 - exist2 := g.Get("comments2") - assert.Len(t, exist2, 100) - - // should be 100 - exist3 := g.Get("comments3") - assert.Len(t, exist3, 100) -} diff --git a/plugins/memory/bst/interface.go b/plugins/memory/bst/interface.go deleted file mode 100644 index ecf40414..00000000 --- a/plugins/memory/bst/interface.go +++ /dev/null @@ -1,11 +0,0 @@ -package bst - -// Storage is general in-memory BST storage implementation -type Storage interface { - // Insert inserts to a vertex with topic ident connection uuid - Insert(uuid string, topic string) - // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed - Remove(uuid, topic string) - // Get will return all connections associated with the topic - Get(topic string) map[string]struct{} -} diff --git a/plugins/memory/config.go b/plugins/memory/config.go deleted file mode 100644 index 08dd9fc3..00000000 --- a/plugins/memory/config.go +++ /dev/null @@ -1,8 +0,0 @@ -package memory - -// Config for the memory driver is empty, it's just a placeholder -type Config struct { - Path string `mapstructure:"path"` -} - -func (c *Config) InitDefaults() {} diff --git a/plugins/memory/driver.go b/plugins/memory/driver.go deleted file mode 100644 index 5a96e773..00000000 --- a/plugins/memory/driver.go +++ /dev/null @@ -1,28 +0,0 @@ -package memory - -import ( - "github.com/spiral/roadrunner/v2/plugins/memory/bst" -) - -type Driver struct { - tree bst.Storage -} - -func NewInMemoryDriver() bst.Storage { - b := &Driver{ - tree: bst.NewBST(), - } - return b -} - -func (d *Driver) Insert(uuid string, topic string) { - d.tree.Insert(uuid, topic) -} - -func (d *Driver) Remove(uuid, topic string) { - d.tree.Remove(uuid, topic) -} - -func (d *Driver) Get(topic string) map[string]struct{} { - return d.tree.Get(topic) -} diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 5efd5522..2ad041aa 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -1,9 +1,9 @@ package memory import ( - "github.com/spiral/errors" + "sync" + "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -13,28 +13,16 @@ const ( type Plugin struct { log logger.Logger - cfg *Config -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("memory_plugin_init") - - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - p.log = log - return nil -} -func (p *Plugin) Serve() chan error { - const op = errors.Op("memory_plugin_serve") - errCh := make(chan error) - - return errCh + // channel with the messages from the RPC + pushCh chan pubsub.Message + // user-subscribed topics + topics sync.Map } -func (p *Plugin) Stop() error { +func (p *Plugin) Init(log logger.Logger) error { + p.log = log + p.pushCh = make(chan pubsub.Message, 100) return nil } @@ -47,21 +35,42 @@ func (p *Plugin) Name() string { } func (p *Plugin) Publish(messages []pubsub.Message) error { - panic("implement me") + for i := 0; i < len(messages); i++ { + p.pushCh <- messages[i] + } + return nil } func (p *Plugin) PublishAsync(messages []pubsub.Message) { - panic("implement me") + go func() { + for i := 0; i < len(messages); i++ { + p.pushCh <- messages[i] + } + }() } func (p *Plugin) Subscribe(topics ...string) error { - panic("implement me") + for i := 0; i < len(topics); i++ { + p.topics.Store(topics[i], struct{}{}) + } + return nil } func (p *Plugin) Unsubscribe(topics ...string) error { - panic("implement me") + for i := 0; i < len(topics); i++ { + p.topics.Delete(topics[i]) + } + return nil } func (p *Plugin) Next() (pubsub.Message, error) { - panic("implement me") + msg := <-p.pushCh + // push only messages, which are subscribed + // TODO better??? + for i := 0; i < len(msg.Topics()); i++ { + if _, ok := p.topics.Load(msg.Topics()[i]); ok { + return msg, nil + } + } + return nil, nil } |