summaryrefslogtreecommitdiff
path: root/plugins/memory
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-27 13:26:12 +0300
committerValery Piashchynski <[email protected]>2021-05-27 13:26:12 +0300
commit0a9aea326045e56716f0736f7aa8520305362c51 (patch)
tree532ca326690d81e97136248dd798d23a56843278 /plugins/memory
parent57a30c2b49c36161b3af3e539a8618c2d39a5cc9 (diff)
- Move bst to the pkg folder
- Add comments - Fix all golangci-lint warnings Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memory')
-rw-r--r--plugins/memory/bst/bst.go136
-rw-r--r--plugins/memory/bst/bst_test.go37
-rw-r--r--plugins/memory/bst/interface.go11
-rw-r--r--plugins/memory/config.go8
-rw-r--r--plugins/memory/driver.go28
-rw-r--r--plugins/memory/plugin.go61
6 files changed, 35 insertions, 246 deletions
diff --git a/plugins/memory/bst/bst.go b/plugins/memory/bst/bst.go
deleted file mode 100644
index 3060ff11..00000000
--- a/plugins/memory/bst/bst.go
+++ /dev/null
@@ -1,136 +0,0 @@
-package bst
-
-// BST ...
-type BST struct {
- // registered topic, not unique
- topic string
- // associated connections with the topic
- uuids map[string]struct{}
-
- // left and right subtrees
- left *BST
- right *BST
-}
-
-func NewBST() Storage {
- return &BST{}
-}
-
-// Insert uuid to the topic
-func (b *BST) Insert(uuid string, topic string) {
- curr := b
-
- for {
- if curr.topic == topic {
- curr.uuids[uuid] = struct{}{}
- return
- }
- // if topic less than curr topic
- if curr.topic < topic {
- if curr.left == nil {
- curr.left = &BST{
- topic: topic,
- uuids: map[string]struct{}{uuid: {}},
- }
- return
- }
- // move forward
- curr = curr.left
- } else {
- if curr.right == nil {
- curr.right = &BST{
- topic: topic,
- uuids: map[string]struct{}{uuid: {}},
- }
- return
- }
-
- curr = curr.right
- }
- }
-}
-
-func (b *BST) Get(topic string) map[string]struct{} {
- curr := b
- for curr != nil {
- if curr.topic == topic {
- return curr.uuids
- }
- if curr.topic < topic {
- curr = curr.left
- continue
- }
- if curr.topic > topic {
- curr = curr.right
- continue
- }
- }
-
- return nil
-}
-
-func (b *BST) Remove(uuid string, topic string) {
- b.removeHelper(uuid, topic, nil)
-}
-
-func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit
- curr := b
- for curr != nil {
- if topic < curr.topic {
- parent = curr
- curr = curr.left
- } else if topic > curr.topic {
- parent = curr
- curr = curr.right
- } else {
- if len(curr.uuids) > 1 {
- if _, ok := curr.uuids[uuid]; ok {
- delete(curr.uuids, uuid)
- return
- }
- }
-
- if curr.left != nil && curr.right != nil {
- curr.topic, curr.uuids = curr.right.traverseForMinString()
- curr.right.removeHelper(curr.topic, uuid, curr)
- } else if parent == nil {
- if curr.left != nil {
- curr.topic = curr.left.topic
- curr.uuids = curr.left.uuids
-
- curr.right = curr.left.right
- curr.left = curr.left.left
- } else if curr.right != nil {
- curr.topic = curr.right.topic
- curr.uuids = curr.right.uuids
-
- curr.left = curr.right.left
- curr.right = curr.right.right
- } else {
- // single node tree
- }
- } else if parent.left == curr {
- if curr.left != nil {
- parent.left = curr.left
- } else {
- parent.left = curr.right
- }
- } else if parent.right == curr {
- if curr.left != nil {
- parent.right = curr.left
- } else {
- parent.right = curr.right
- }
- }
- break
- }
- }
-}
-
-//go:inline
-func (b *BST) traverseForMinString() (string, map[string]struct{}) {
- if b.left == nil {
- return b.topic, b.uuids
- }
- return b.left.traverseForMinString()
-}
diff --git a/plugins/memory/bst/bst_test.go b/plugins/memory/bst/bst_test.go
deleted file mode 100644
index e8a13760..00000000
--- a/plugins/memory/bst/bst_test.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package bst
-
-import (
- "testing"
-
- "github.com/google/uuid"
- "github.com/stretchr/testify/assert"
-)
-
-func TestNewBST(t *testing.T) {
- // create a new bst
- g := NewBST()
-
- for i := 0; i < 100; i++ {
- g.Insert(uuid.NewString(), "comments")
- }
-
- for i := 0; i < 100; i++ {
- g.Insert(uuid.NewString(), "comments2")
- }
-
- for i := 0; i < 100; i++ {
- g.Insert(uuid.NewString(), "comments3")
- }
-
- // should be 100
- exist := g.Get("comments")
- assert.Len(t, exist, 100)
-
- // should be 100
- exist2 := g.Get("comments2")
- assert.Len(t, exist2, 100)
-
- // should be 100
- exist3 := g.Get("comments3")
- assert.Len(t, exist3, 100)
-}
diff --git a/plugins/memory/bst/interface.go b/plugins/memory/bst/interface.go
deleted file mode 100644
index ecf40414..00000000
--- a/plugins/memory/bst/interface.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package bst
-
-// Storage is general in-memory BST storage implementation
-type Storage interface {
- // Insert inserts to a vertex with topic ident connection uuid
- Insert(uuid string, topic string)
- // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
- Remove(uuid, topic string)
- // Get will return all connections associated with the topic
- Get(topic string) map[string]struct{}
-}
diff --git a/plugins/memory/config.go b/plugins/memory/config.go
deleted file mode 100644
index 08dd9fc3..00000000
--- a/plugins/memory/config.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package memory
-
-// Config for the memory driver is empty, it's just a placeholder
-type Config struct {
- Path string `mapstructure:"path"`
-}
-
-func (c *Config) InitDefaults() {}
diff --git a/plugins/memory/driver.go b/plugins/memory/driver.go
deleted file mode 100644
index 5a96e773..00000000
--- a/plugins/memory/driver.go
+++ /dev/null
@@ -1,28 +0,0 @@
-package memory
-
-import (
- "github.com/spiral/roadrunner/v2/plugins/memory/bst"
-)
-
-type Driver struct {
- tree bst.Storage
-}
-
-func NewInMemoryDriver() bst.Storage {
- b := &Driver{
- tree: bst.NewBST(),
- }
- return b
-}
-
-func (d *Driver) Insert(uuid string, topic string) {
- d.tree.Insert(uuid, topic)
-}
-
-func (d *Driver) Remove(uuid, topic string) {
- d.tree.Remove(uuid, topic)
-}
-
-func (d *Driver) Get(topic string) map[string]struct{} {
- return d.tree.Get(topic)
-}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 5efd5522..2ad041aa 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -1,9 +1,9 @@
package memory
import (
- "github.com/spiral/errors"
+ "sync"
+
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -13,28 +13,16 @@ const (
type Plugin struct {
log logger.Logger
- cfg *Config
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("memory_plugin_init")
-
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- p.log = log
- return nil
-}
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("memory_plugin_serve")
- errCh := make(chan error)
-
- return errCh
+ // channel with the messages from the RPC
+ pushCh chan pubsub.Message
+ // user-subscribed topics
+ topics sync.Map
}
-func (p *Plugin) Stop() error {
+func (p *Plugin) Init(log logger.Logger) error {
+ p.log = log
+ p.pushCh = make(chan pubsub.Message, 100)
return nil
}
@@ -47,21 +35,42 @@ func (p *Plugin) Name() string {
}
func (p *Plugin) Publish(messages []pubsub.Message) error {
- panic("implement me")
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ return nil
}
func (p *Plugin) PublishAsync(messages []pubsub.Message) {
- panic("implement me")
+ go func() {
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ }()
}
func (p *Plugin) Subscribe(topics ...string) error {
- panic("implement me")
+ for i := 0; i < len(topics); i++ {
+ p.topics.Store(topics[i], struct{}{})
+ }
+ return nil
}
func (p *Plugin) Unsubscribe(topics ...string) error {
- panic("implement me")
+ for i := 0; i < len(topics); i++ {
+ p.topics.Delete(topics[i])
+ }
+ return nil
}
func (p *Plugin) Next() (pubsub.Message, error) {
- panic("implement me")
+ msg := <-p.pushCh
+ // push only messages, which are subscribed
+ // TODO better???
+ for i := 0; i < len(msg.Topics()); i++ {
+ if _, ok := p.topics.Load(msg.Topics()[i]); ok {
+ return msg, nil
+ }
+ }
+ return nil, nil
}