diff options
author | Valery Piashchynski <[email protected]> | 2021-05-27 00:09:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-27 00:09:33 +0300 |
commit | dc3c5455e5c9b32737a0620c8bdb8bda0226dba7 (patch) | |
tree | 6ba562da6de7f32a8d528b72cbb56a8bc98c1b30 /plugins/broadcast/memory | |
parent | d2e9d8320857f5768c54843a43ad16f59d6a3e8f (diff) |
- Update all main abstractions
- Desighn a new interfaces responsible for the whole PubSub
- New plugin - websockets
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/memory')
-rw-r--r-- | plugins/broadcast/memory/bst/bst.go | 134 | ||||
-rw-r--r-- | plugins/broadcast/memory/bst/bst_test.go | 33 | ||||
-rw-r--r-- | plugins/broadcast/memory/bst/interface.go | 11 | ||||
-rw-r--r-- | plugins/broadcast/memory/config.go | 6 | ||||
-rw-r--r-- | plugins/broadcast/memory/driver.go | 29 | ||||
-rw-r--r-- | plugins/broadcast/memory/plugin.go | 67 |
6 files changed, 0 insertions, 280 deletions
diff --git a/plugins/broadcast/memory/bst/bst.go b/plugins/broadcast/memory/bst/bst.go deleted file mode 100644 index 7d09a10f..00000000 --- a/plugins/broadcast/memory/bst/bst.go +++ /dev/null @@ -1,134 +0,0 @@ -package bst - -// BST ... -type BST struct { - // registered topic, not unique - topic string - // associated connections with the topic - uuids map[string]struct{} - - // left and right subtrees - left *BST - right *BST -} - -func NewBST() Storage { - return &BST{} -} - -// Insert uuid to the topic -func (B *BST) Insert(uuid string, topic string) { - curr := B - - for { - if curr.topic == topic { - curr.uuids[uuid] = struct{}{} - return - } - // if topic less than curr topic - if curr.topic < topic { - if curr.left == nil { - curr.left = &BST{ - topic: topic, - uuids: map[string]struct{}{uuid: {}}, - } - return - } - // move forward - curr = curr.left - } else { - if curr.right == nil { - curr.right = &BST{ - topic: topic, - uuids: map[string]struct{}{uuid: {}}, - } - return - } - - curr = curr.right - } - } -} - -func (B *BST) Get(topic string) map[string]struct{} { - curr := B - for curr != nil { - if curr.topic == topic { - return curr.uuids - } - if curr.topic < topic { - curr = curr.left - } - if curr.topic > topic { - curr = curr.right - } - } - - return nil -} - -func (B *BST) Remove(uuid string, topic string) { - B.removeHelper(uuid, topic, nil) -} - -func (B *BST) removeHelper(uuid string, topic string, parent *BST) { - curr := B - for curr != nil { - if topic < curr.topic { - parent = curr - curr = curr.left - } else if topic > curr.topic { - parent = curr - curr = curr.right - } else { - if len(curr.uuids) > 1 { - if _, ok := curr.uuids[uuid]; ok { - delete(curr.uuids, uuid) - return - } - } - - if curr.left != nil && curr.right != nil { - curr.topic, curr.uuids = curr.right.traverseForMinString() - curr.right.removeHelper(curr.topic, uuid, curr) - } else if parent == nil { - if curr.left != nil { - curr.topic = curr.left.topic - curr.uuids = curr.left.uuids - - curr.right = curr.left.right - curr.left = curr.left.left - } else if curr.right != nil { - curr.topic = curr.right.topic - curr.uuids = curr.right.uuids - - curr.left = curr.right.left - curr.right = curr.right.right - } else { - // single node tree - } - } else if parent.left == curr { - if curr.left != nil { - parent.left = curr.left - } else { - parent.left = curr.right - } - } else if parent.right == curr { - if curr.left != nil { - parent.right = curr.left - } else { - parent.right = curr.right - } - } - break - } - } -} - -//go:inline -func (B *BST) traverseForMinString() (string, map[string]struct{}) { - if B.left == nil { - return B.topic, B.uuids - } - return B.left.traverseForMinString() -} diff --git a/plugins/broadcast/memory/bst/bst_test.go b/plugins/broadcast/memory/bst/bst_test.go deleted file mode 100644 index b5ad6c10..00000000 --- a/plugins/broadcast/memory/bst/bst_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package bst - -import ( - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" -) - -func TestNewBST(t *testing.T) { - g := NewBST() - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments") - } - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments2") - } - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments3") - } - - exist := g.Get("comments") - assert.Len(t, exist, 100) - - exist2 := g.Get("comments2") - assert.Len(t, exist2, 100) - - exist3 := g.Get("comments3") - assert.Len(t, exist3, 100) -} diff --git a/plugins/broadcast/memory/bst/interface.go b/plugins/broadcast/memory/bst/interface.go deleted file mode 100644 index ecf40414..00000000 --- a/plugins/broadcast/memory/bst/interface.go +++ /dev/null @@ -1,11 +0,0 @@ -package bst - -// Storage is general in-memory BST storage implementation -type Storage interface { - // Insert inserts to a vertex with topic ident connection uuid - Insert(uuid string, topic string) - // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed - Remove(uuid, topic string) - // Get will return all connections associated with the topic - Get(topic string) map[string]struct{} -} diff --git a/plugins/broadcast/memory/config.go b/plugins/broadcast/memory/config.go deleted file mode 100644 index e80695bc..00000000 --- a/plugins/broadcast/memory/config.go +++ /dev/null @@ -1,6 +0,0 @@ -package memory - -// Config for the memory driver is empty, it's just a placeholder -type Config struct{} - -func (c *Config) InitDefaults() {} diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go deleted file mode 100644 index 80527e4b..00000000 --- a/plugins/broadcast/memory/driver.go +++ /dev/null @@ -1,29 +0,0 @@ -package memory - -import ( - "github.com/spiral/roadrunner/v2/plugins/broadcast" -) - -type Driver struct { -} - -func NewInMemoryDriver() broadcast.Storage { - b := &Driver{} - return b -} - -func (d *Driver) Store(uuid string, topics ...string) { - panic("implement me") -} - -func (d *Driver) StorePattern(uuid string, pattern string) { - panic("implement me") -} - -func (d *Driver) GetConnection(pattern string) []string { - panic("implement me") -} - -func (d *Driver) Construct(key string) (broadcast.Storage, error) { - return nil, nil -} diff --git a/plugins/broadcast/memory/plugin.go b/plugins/broadcast/memory/plugin.go deleted file mode 100644 index 2bd894a0..00000000 --- a/plugins/broadcast/memory/plugin.go +++ /dev/null @@ -1,67 +0,0 @@ -package memory - -import ( - "fmt" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "broadcast" - SectionName string = "memory" -) - -type Plugin struct { - log logger.Logger - cfg *Config -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("memory_plugin_init") - - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - if !cfg.Has(fmt.Sprintf("%s.%s", PluginName, SectionName)) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, errors.Disabled, err) - } - - p.cfg.InitDefaults() - - p.log = log - return nil -} - -func (p *Plugin) Serve() chan error { - const op = errors.Op("memory_plugin_serve") - errCh := make(chan error) - - return errCh -} - -func (p *Plugin) Stop() error { - - return nil -} - -// Available interface implementation for the plugin -func (p *Plugin) Available() {} - -// Name is endure.Named interface implementation -func (p *Plugin) Name() string { - // broadcast.memory - return fmt.Sprintf("%s.%s", PluginName, SectionName) -} - -func (p *Plugin) Publish(msg []*broadcast.Message) error { - return nil -} |