summaryrefslogtreecommitdiff
path: root/plugins/memory
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-27 00:09:33 +0300
committerValery Piashchynski <[email protected]>2021-05-27 00:09:33 +0300
commitdc3c5455e5c9b32737a0620c8bdb8bda0226dba7 (patch)
tree6ba562da6de7f32a8d528b72cbb56a8bc98c1b30 /plugins/memory
parentd2e9d8320857f5768c54843a43ad16f59d6a3e8f (diff)
- Update all main abstractions
- Desighn a new interfaces responsible for the whole PubSub - New plugin - websockets Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memory')
-rw-r--r--plugins/memory/bst/bst.go136
-rw-r--r--plugins/memory/bst/bst_test.go37
-rw-r--r--plugins/memory/bst/interface.go11
-rw-r--r--plugins/memory/config.go8
-rw-r--r--plugins/memory/driver.go28
-rw-r--r--plugins/memory/plugin.go67
6 files changed, 287 insertions, 0 deletions
diff --git a/plugins/memory/bst/bst.go b/plugins/memory/bst/bst.go
new file mode 100644
index 00000000..3060ff11
--- /dev/null
+++ b/plugins/memory/bst/bst.go
@@ -0,0 +1,136 @@
+package bst
+
+// BST ...
+type BST struct {
+ // registered topic, not unique
+ topic string
+ // associated connections with the topic
+ uuids map[string]struct{}
+
+ // left and right subtrees
+ left *BST
+ right *BST
+}
+
+func NewBST() Storage {
+ return &BST{}
+}
+
+// Insert uuid to the topic
+func (b *BST) Insert(uuid string, topic string) {
+ curr := b
+
+ for {
+ if curr.topic == topic {
+ curr.uuids[uuid] = struct{}{}
+ return
+ }
+ // if topic less than curr topic
+ if curr.topic < topic {
+ if curr.left == nil {
+ curr.left = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+ // move forward
+ curr = curr.left
+ } else {
+ if curr.right == nil {
+ curr.right = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+
+ curr = curr.right
+ }
+ }
+}
+
+func (b *BST) Get(topic string) map[string]struct{} {
+ curr := b
+ for curr != nil {
+ if curr.topic == topic {
+ return curr.uuids
+ }
+ if curr.topic < topic {
+ curr = curr.left
+ continue
+ }
+ if curr.topic > topic {
+ curr = curr.right
+ continue
+ }
+ }
+
+ return nil
+}
+
+func (b *BST) Remove(uuid string, topic string) {
+ b.removeHelper(uuid, topic, nil)
+}
+
+func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit
+ curr := b
+ for curr != nil {
+ if topic < curr.topic {
+ parent = curr
+ curr = curr.left
+ } else if topic > curr.topic {
+ parent = curr
+ curr = curr.right
+ } else {
+ if len(curr.uuids) > 1 {
+ if _, ok := curr.uuids[uuid]; ok {
+ delete(curr.uuids, uuid)
+ return
+ }
+ }
+
+ if curr.left != nil && curr.right != nil {
+ curr.topic, curr.uuids = curr.right.traverseForMinString()
+ curr.right.removeHelper(curr.topic, uuid, curr)
+ } else if parent == nil {
+ if curr.left != nil {
+ curr.topic = curr.left.topic
+ curr.uuids = curr.left.uuids
+
+ curr.right = curr.left.right
+ curr.left = curr.left.left
+ } else if curr.right != nil {
+ curr.topic = curr.right.topic
+ curr.uuids = curr.right.uuids
+
+ curr.left = curr.right.left
+ curr.right = curr.right.right
+ } else {
+ // single node tree
+ }
+ } else if parent.left == curr {
+ if curr.left != nil {
+ parent.left = curr.left
+ } else {
+ parent.left = curr.right
+ }
+ } else if parent.right == curr {
+ if curr.left != nil {
+ parent.right = curr.left
+ } else {
+ parent.right = curr.right
+ }
+ }
+ break
+ }
+ }
+}
+
+//go:inline
+func (b *BST) traverseForMinString() (string, map[string]struct{}) {
+ if b.left == nil {
+ return b.topic, b.uuids
+ }
+ return b.left.traverseForMinString()
+}
diff --git a/plugins/memory/bst/bst_test.go b/plugins/memory/bst/bst_test.go
new file mode 100644
index 00000000..e8a13760
--- /dev/null
+++ b/plugins/memory/bst/bst_test.go
@@ -0,0 +1,37 @@
+package bst
+
+import (
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewBST(t *testing.T) {
+ // create a new bst
+ g := NewBST()
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments2")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments3")
+ }
+
+ // should be 100
+ exist := g.Get("comments")
+ assert.Len(t, exist, 100)
+
+ // should be 100
+ exist2 := g.Get("comments2")
+ assert.Len(t, exist2, 100)
+
+ // should be 100
+ exist3 := g.Get("comments3")
+ assert.Len(t, exist3, 100)
+}
diff --git a/plugins/memory/bst/interface.go b/plugins/memory/bst/interface.go
new file mode 100644
index 00000000..ecf40414
--- /dev/null
+++ b/plugins/memory/bst/interface.go
@@ -0,0 +1,11 @@
+package bst
+
+// Storage is general in-memory BST storage implementation
+type Storage interface {
+ // Insert inserts to a vertex with topic ident connection uuid
+ Insert(uuid string, topic string)
+ // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
+ Remove(uuid, topic string)
+ // Get will return all connections associated with the topic
+ Get(topic string) map[string]struct{}
+}
diff --git a/plugins/memory/config.go b/plugins/memory/config.go
new file mode 100644
index 00000000..08dd9fc3
--- /dev/null
+++ b/plugins/memory/config.go
@@ -0,0 +1,8 @@
+package memory
+
+// Config for the memory driver is empty, it's just a placeholder
+type Config struct {
+ Path string `mapstructure:"path"`
+}
+
+func (c *Config) InitDefaults() {}
diff --git a/plugins/memory/driver.go b/plugins/memory/driver.go
new file mode 100644
index 00000000..5a96e773
--- /dev/null
+++ b/plugins/memory/driver.go
@@ -0,0 +1,28 @@
+package memory
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/memory/bst"
+)
+
+type Driver struct {
+ tree bst.Storage
+}
+
+func NewInMemoryDriver() bst.Storage {
+ b := &Driver{
+ tree: bst.NewBST(),
+ }
+ return b
+}
+
+func (d *Driver) Insert(uuid string, topic string) {
+ d.tree.Insert(uuid, topic)
+}
+
+func (d *Driver) Remove(uuid, topic string) {
+ d.tree.Remove(uuid, topic)
+}
+
+func (d *Driver) Get(topic string) map[string]struct{} {
+ return d.tree.Get(topic)
+}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
new file mode 100644
index 00000000..5efd5522
--- /dev/null
+++ b/plugins/memory/plugin.go
@@ -0,0 +1,67 @@
+package memory
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "memory"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg *Config
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("memory_plugin_init")
+
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("memory_plugin_serve")
+ errCh := make(chan error)
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+// Available interface implementation for the plugin
+func (p *Plugin) Available() {}
+
+// Name is endure.Named interface implementation
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Publish(messages []pubsub.Message) error {
+ panic("implement me")
+}
+
+func (p *Plugin) PublishAsync(messages []pubsub.Message) {
+ panic("implement me")
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ panic("implement me")
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ panic("implement me")
+}
+
+func (p *Plugin) Next() (pubsub.Message, error) {
+ panic("implement me")
+}