diff options
author | Valery Piashchynski <[email protected]> | 2021-05-27 13:26:12 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-27 13:26:12 +0300 |
commit | 0a9aea326045e56716f0736f7aa8520305362c51 (patch) | |
tree | 532ca326690d81e97136248dd798d23a56843278 /pkg | |
parent | 57a30c2b49c36161b3af3e539a8618c2d39a5cc9 (diff) |
- Move bst to the pkg folder
- Add comments
- Fix all golangci-lint warnings
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/bst/bst.go | 136 | ||||
-rw-r--r-- | pkg/bst/bst_test.go | 37 | ||||
-rw-r--r-- | pkg/bst/doc.go | 7 | ||||
-rw-r--r-- | pkg/bst/interface.go | 11 | ||||
-rw-r--r-- | pkg/pubsub/message.go | 38 |
5 files changed, 199 insertions, 30 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go new file mode 100644 index 00000000..8477ceee --- /dev/null +++ b/pkg/bst/bst.go @@ -0,0 +1,136 @@ +package bst + +// BST ... +type BST struct { + // registered topic, not unique + topic string + // associated connections with the topic + uuids map[string]struct{} + + // left and right subtrees + left *BST + right *BST +} + +func NewBST() Storage { + return &BST{} +} + +// Insert uuid to the topic +func (b *BST) Insert(uuid string, topic string) { + curr := b + + for { + if curr.topic == topic { + curr.uuids[uuid] = struct{}{} + return + } + // if topic less than curr topic + if curr.topic < topic { + if curr.left == nil { + curr.left = &BST{ + topic: topic, + uuids: map[string]struct{}{uuid: {}}, + } + return + } + // move forward + curr = curr.left + } else { + if curr.right == nil { + curr.right = &BST{ + topic: topic, + uuids: map[string]struct{}{uuid: {}}, + } + return + } + + curr = curr.right + } + } +} + +func (b *BST) Get(topic string) map[string]struct{} { + curr := b + for curr != nil { + if curr.topic == topic { + return curr.uuids + } + if curr.topic < topic { + curr = curr.left + continue + } + if curr.topic > topic { + curr = curr.right + continue + } + } + + return nil +} + +func (b *BST) Remove(uuid string, topic string) { + b.removeHelper(uuid, topic, nil) +} + +func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit + curr := b + for curr != nil { + if topic < curr.topic { //nolint:gocritic + parent = curr + curr = curr.left + } else if topic > curr.topic { + parent = curr + curr = curr.right + } else { + if len(curr.uuids) > 1 { + if _, ok := curr.uuids[uuid]; ok { + delete(curr.uuids, uuid) + return + } + } + + if curr.left != nil && curr.right != nil { //nolint:gocritic + curr.topic, curr.uuids = curr.right.traverseForMinString() + curr.right.removeHelper(curr.topic, uuid, curr) + } else if parent == nil { + if curr.left != nil { //nolint:gocritic + curr.topic = curr.left.topic + curr.uuids = curr.left.uuids + + curr.right = curr.left.right + curr.left = curr.left.left + } else if curr.right != nil { + curr.topic = curr.right.topic + curr.uuids = curr.right.uuids + + curr.left = curr.right.left + curr.right = curr.right.right + } else { //nolint:staticcheck + // single node tree + } + } else if parent.left == curr { + if curr.left != nil { + parent.left = curr.left + } else { + parent.left = curr.right + } + } else if parent.right == curr { + if curr.left != nil { + parent.right = curr.left + } else { + parent.right = curr.right + } + } + break + } + } +} + +//go:inline +func (b *BST) traverseForMinString() (string, map[string]struct{}) { + if b.left == nil { + return b.topic, b.uuids + } + return b.left.traverseForMinString() +} diff --git a/pkg/bst/bst_test.go b/pkg/bst/bst_test.go new file mode 100644 index 00000000..e8a13760 --- /dev/null +++ b/pkg/bst/bst_test.go @@ -0,0 +1,37 @@ +package bst + +import ( + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestNewBST(t *testing.T) { + // create a new bst + g := NewBST() + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments") + } + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments2") + } + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments3") + } + + // should be 100 + exist := g.Get("comments") + assert.Len(t, exist, 100) + + // should be 100 + exist2 := g.Get("comments2") + assert.Len(t, exist2, 100) + + // should be 100 + exist3 := g.Get("comments3") + assert.Len(t, exist3, 100) +} diff --git a/pkg/bst/doc.go b/pkg/bst/doc.go new file mode 100644 index 00000000..abb7e6e9 --- /dev/null +++ b/pkg/bst/doc.go @@ -0,0 +1,7 @@ +package bst + +/* +Binary search tree for the pubsub + +The vertex may have one or multiply topics associated with the single websocket connection UUID +*/ diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go new file mode 100644 index 00000000..ecf40414 --- /dev/null +++ b/pkg/bst/interface.go @@ -0,0 +1,11 @@ +package bst + +// Storage is general in-memory BST storage implementation +type Storage interface { + // Insert inserts to a vertex with topic ident connection uuid + Insert(uuid string, topic string) + // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed + Remove(uuid, topic string) + // Get will return all connections associated with the topic + Get(topic string) map[string]struct{} +} diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go index ab74eb98..2536aece 100644 --- a/pkg/pubsub/message.go +++ b/pkg/pubsub/message.go @@ -6,59 +6,37 @@ import ( type Msg struct { // Topic message been pushed into. - T []string `json:"topic"` + Topics_ []string `json:"topic"` // Command (join, leave, headers) - C string `json:"command"` + Command_ string `json:"command"` // Broker (redis, memory) - B string `json:"broker"` + Broker_ string `json:"broker"` // Payload to be broadcasted - P []byte `json:"payload"` + Payload_ []byte `json:"payload"` } -//func (m Msg) UnmarshalBinary(data []byte) error { -// //Use default gob decoder -// reader := bytes.NewReader(data) -// dec := gob.NewDecoder(reader) -// if err := dec.Decode(&m); err != nil { -// return err -// } -// -// return nil -//} - func (m *Msg) MarshalBinary() ([]byte, error) { - //buf := new(bytes.Buffer) - // - //for i := 0; i < len(m.T); i++ { - // buf.WriteString(m.T[i]) - //} - // - //buf.WriteString(m.C) - //buf.WriteString(m.B) - //buf.Write(m.P) - return json.Marshal(m) - } // Payload in raw bytes func (m *Msg) Payload() []byte { - return m.P + return m.Payload_ } // Command for the connection func (m *Msg) Command() string { - return m.C + return m.Command_ } // Topics to subscribe func (m *Msg) Topics() []string { - return m.T + return m.Topics_ } func (m *Msg) Broker() string { - return m.B + return m.Broker_ } |