summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
committerValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
commitd0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch)
tree7e6ec1a320f596b31f205caee5d5753eaa42f4ff /pkg
parent0323e070103cc2c30d2cdfb12719d753acafe151 (diff)
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r--pkg/bst/bst.go16
-rw-r--r--pkg/bst/interface.go2
-rw-r--r--pkg/pubsub/interface.go13
3 files changed, 29 insertions, 2 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go
index 664937ba..f8426b12 100644
--- a/pkg/bst/bst.go
+++ b/pkg/bst/bst.go
@@ -52,6 +52,22 @@ func (b *BST) Insert(uuid string, topic string) {
}
}
+func (b *BST) Contains(topic string) bool {
+ curr := b
+ for curr != nil {
+ switch {
+ case topic < curr.topic:
+ curr = curr.left
+ case topic > curr.topic:
+ curr = curr.right
+ case topic == curr.topic:
+ return true
+ }
+ }
+
+ return false
+}
+
func (b *BST) Get(topic string) map[string]struct{} {
curr := b
for curr != nil {
diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go
index ecf40414..95b03e11 100644
--- a/pkg/bst/interface.go
+++ b/pkg/bst/interface.go
@@ -8,4 +8,6 @@ type Storage interface {
Remove(uuid, topic string)
// Get will return all connections associated with the topic
Get(topic string) map[string]struct{}
+ // Contains checks if the BST contains a topic
+ Contains(topic string) bool
}
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index 18c1a80c..2d5d9595 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -2,6 +2,10 @@ package pubsub
import "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+/*
+This interface is in BETA. It might be changed.
+*/
+
// PubSub ...
type PubSub interface {
Publisher
@@ -10,15 +14,20 @@ type PubSub interface {
}
// Subscriber defines the ability to operate as message passing broker.
+// BETA interface
type Subscriber interface {
// Subscribe broker to one or multiple topics.
- Subscribe(topics ...string) error
+ Subscribe(connectionID string, topics ...string) error
// Unsubscribe from one or multiply topics
- Unsubscribe(topics ...string) error
+ Unsubscribe(connectionID string, topics ...string) error
+
+ // Connections returns all connections associated with the particular topic
+ Connections(topic string, ret map[string]struct{})
}
// Publisher publish one or more messages
+// BETA interface
type Publisher interface {
// Publish one or multiple Channel.
Publish(messages []byte) error