summaryrefslogtreecommitdiff
path: root/plugins/broadcast
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-20 22:46:19 +0300
committerValery Piashchynski <[email protected]>2021-05-20 22:46:19 +0300
commitd2e9d8320857f5768c54843a43ad16f59d6a3e8f (patch)
treef6f46e688b6005b2b0ea10c7238e925c0b58f25a /plugins/broadcast
parentf85172106b4723b705aa75c3c310e8cebd050a8d (diff)
- Update linters
- Implement base interfaces - Implement BST search algo for the in-memory storage Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast')
-rw-r--r--plugins/broadcast/config.go9
-rw-r--r--plugins/broadcast/doc/ws.drawio1
-rw-r--r--plugins/broadcast/interface.go28
-rw-r--r--plugins/broadcast/memory/bst/bst.go134
-rw-r--r--plugins/broadcast/memory/bst/bst_test.go33
-rw-r--r--plugins/broadcast/memory/bst/interface.go11
-rw-r--r--plugins/broadcast/memory/driver.go28
-rw-r--r--plugins/broadcast/plugin.go16
-rw-r--r--plugins/broadcast/redis/driver.go28
-rw-r--r--plugins/broadcast/ws/commands/leave.go1
-rw-r--r--plugins/broadcast/ws/commands/subscribe.go1
-rw-r--r--plugins/broadcast/ws/connection/connection.go (renamed from plugins/broadcast/ws/connection.go)2
-rw-r--r--plugins/broadcast/ws/plugin.go25
-rw-r--r--plugins/broadcast/ws/subscriber.go41
-rw-r--r--plugins/broadcast/ws/ws_middleware.go13
15 files changed, 300 insertions, 71 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
index 03bf6510..5e7b7f20 100644
--- a/plugins/broadcast/config.go
+++ b/plugins/broadcast/config.go
@@ -3,16 +3,17 @@ package broadcast
/*
broadcast:
ws-us-region-1:
- subscriber: ws
- path: "/ws"
+ subscriber: websockets
+ middleware: ["headers", "gzip"] # ????
+ address: "localhost:53223"
+ path: "/ws"
- driver: redis
+ storage: redis
address:
- 6379
db: 0
*/
-
// Config represents configuration for the ws plugin
type Config struct {
// Sections represent particular broadcast plugin section
diff --git a/plugins/broadcast/doc/ws.drawio b/plugins/broadcast/doc/ws.drawio
new file mode 100644
index 00000000..739b797a
--- /dev/null
+++ b/plugins/broadcast/doc/ws.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
index 3ed8b412..47c779b5 100644
--- a/plugins/broadcast/interface.go
+++ b/plugins/broadcast/interface.go
@@ -1,25 +1,29 @@
package broadcast
-import "encoding/json"
+import (
+ "encoding/json"
+)
// Subscriber defines the ability to operate as message passing broker.
type Subscriber interface {
// Subscribe broker to one or multiple topics.
- Subscribe(upstream chan *Message, topics ...string) error
-
- // SubscribePattern broker to pattern.
- SubscribePattern(upstream chan *Message, pattern string) error
-
- // Unsubscribe broker from one or multiple topics.
- Unsubscribe(upstream chan *Message, topics ...string) error
-
+ Subscribe(topics ...string) error
// UnsubscribePattern broker from pattern.
- UnsubscribePattern(upstream chan *Message, pattern string) error
+ UnsubscribePattern(pattern string) error
}
+// Storage used to store patterns and topics
type Storage interface {
- Store(topics ...string)
- StorePattern(pattern string)
+ // Store connection uuid associated with the provided topics
+ Store(uuid string, topics ...string)
+ // StorePattern stores pattern associated with the particular connection
+ StorePattern(uuid string, pattern string)
+
+ // GetConnection returns connections for the particular pattern
+ GetConnection(pattern string) []string
+
+ // Construct is a constructor for the storage according to the provided configuration key (broadcast.websocket for example)
+ Construct(key string) (Storage, error)
}
type Publisher interface {
diff --git a/plugins/broadcast/memory/bst/bst.go b/plugins/broadcast/memory/bst/bst.go
new file mode 100644
index 00000000..7d09a10f
--- /dev/null
+++ b/plugins/broadcast/memory/bst/bst.go
@@ -0,0 +1,134 @@
+package bst
+
+// BST ...
+type BST struct {
+ // registered topic, not unique
+ topic string
+ // associated connections with the topic
+ uuids map[string]struct{}
+
+ // left and right subtrees
+ left *BST
+ right *BST
+}
+
+func NewBST() Storage {
+ return &BST{}
+}
+
+// Insert uuid to the topic
+func (B *BST) Insert(uuid string, topic string) {
+ curr := B
+
+ for {
+ if curr.topic == topic {
+ curr.uuids[uuid] = struct{}{}
+ return
+ }
+ // if topic less than curr topic
+ if curr.topic < topic {
+ if curr.left == nil {
+ curr.left = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+ // move forward
+ curr = curr.left
+ } else {
+ if curr.right == nil {
+ curr.right = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+
+ curr = curr.right
+ }
+ }
+}
+
+func (B *BST) Get(topic string) map[string]struct{} {
+ curr := B
+ for curr != nil {
+ if curr.topic == topic {
+ return curr.uuids
+ }
+ if curr.topic < topic {
+ curr = curr.left
+ }
+ if curr.topic > topic {
+ curr = curr.right
+ }
+ }
+
+ return nil
+}
+
+func (B *BST) Remove(uuid string, topic string) {
+ B.removeHelper(uuid, topic, nil)
+}
+
+func (B *BST) removeHelper(uuid string, topic string, parent *BST) {
+ curr := B
+ for curr != nil {
+ if topic < curr.topic {
+ parent = curr
+ curr = curr.left
+ } else if topic > curr.topic {
+ parent = curr
+ curr = curr.right
+ } else {
+ if len(curr.uuids) > 1 {
+ if _, ok := curr.uuids[uuid]; ok {
+ delete(curr.uuids, uuid)
+ return
+ }
+ }
+
+ if curr.left != nil && curr.right != nil {
+ curr.topic, curr.uuids = curr.right.traverseForMinString()
+ curr.right.removeHelper(curr.topic, uuid, curr)
+ } else if parent == nil {
+ if curr.left != nil {
+ curr.topic = curr.left.topic
+ curr.uuids = curr.left.uuids
+
+ curr.right = curr.left.right
+ curr.left = curr.left.left
+ } else if curr.right != nil {
+ curr.topic = curr.right.topic
+ curr.uuids = curr.right.uuids
+
+ curr.left = curr.right.left
+ curr.right = curr.right.right
+ } else {
+ // single node tree
+ }
+ } else if parent.left == curr {
+ if curr.left != nil {
+ parent.left = curr.left
+ } else {
+ parent.left = curr.right
+ }
+ } else if parent.right == curr {
+ if curr.left != nil {
+ parent.right = curr.left
+ } else {
+ parent.right = curr.right
+ }
+ }
+ break
+ }
+ }
+}
+
+//go:inline
+func (B *BST) traverseForMinString() (string, map[string]struct{}) {
+ if B.left == nil {
+ return B.topic, B.uuids
+ }
+ return B.left.traverseForMinString()
+}
diff --git a/plugins/broadcast/memory/bst/bst_test.go b/plugins/broadcast/memory/bst/bst_test.go
new file mode 100644
index 00000000..b5ad6c10
--- /dev/null
+++ b/plugins/broadcast/memory/bst/bst_test.go
@@ -0,0 +1,33 @@
+package bst
+
+import (
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewBST(t *testing.T) {
+ g := NewBST()
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments2")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments3")
+ }
+
+ exist := g.Get("comments")
+ assert.Len(t, exist, 100)
+
+ exist2 := g.Get("comments2")
+ assert.Len(t, exist2, 100)
+
+ exist3 := g.Get("comments3")
+ assert.Len(t, exist3, 100)
+}
diff --git a/plugins/broadcast/memory/bst/interface.go b/plugins/broadcast/memory/bst/interface.go
new file mode 100644
index 00000000..ecf40414
--- /dev/null
+++ b/plugins/broadcast/memory/bst/interface.go
@@ -0,0 +1,11 @@
+package bst
+
+// Storage is general in-memory BST storage implementation
+type Storage interface {
+ // Insert inserts to a vertex with topic ident connection uuid
+ Insert(uuid string, topic string)
+ // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
+ Remove(uuid, topic string)
+ // Get will return all connections associated with the topic
+ Get(topic string) map[string]struct{}
+}
diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go
index 2eb45c8e..80527e4b 100644
--- a/plugins/broadcast/memory/driver.go
+++ b/plugins/broadcast/memory/driver.go
@@ -1,39 +1,29 @@
package memory
-import "github.com/spiral/roadrunner/v2/plugins/broadcast"
+import (
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+)
type Driver struct {
}
-func NewInMemoryDriver() broadcast.Subscriber {
+func NewInMemoryDriver() broadcast.Storage {
b := &Driver{}
return b
}
-func (d *Driver) Serve() error {
+func (d *Driver) Store(uuid string, topics ...string) {
panic("implement me")
}
-func (d *Driver) Stop() {
+func (d *Driver) StorePattern(uuid string, pattern string) {
panic("implement me")
}
-func (d *Driver) Subscribe(upstream chan *broadcast.Message, topics ...string) error {
+func (d *Driver) GetConnection(pattern string) []string {
panic("implement me")
}
-func (d *Driver) SubscribePattern(upstream chan *broadcast.Message, pattern string) error {
- panic("implement me")
-}
-
-func (d *Driver) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error {
- panic("implement me")
-}
-
-func (d *Driver) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error {
- panic("implement me")
-}
-
-func (d *Driver) Publish(messages ...*broadcast.Message) error {
- panic("implement me")
+func (d *Driver) Construct(key string) (broadcast.Storage, error) {
+ return nil, nil
}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 7ad9e2ae..156bea80 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -47,6 +47,12 @@ func (p *Plugin) Serve() chan error {
return errCh
}
+ if p.driver == nil {
+ // Or if no storage detected, use in-memory storage
+ errCh <- errors.E(op, errors.Str("no storage detected"))
+ return errCh
+ }
+
// start the underlying broker
go func() {
// err := p.broker.Serve()
@@ -72,12 +78,16 @@ func (p *Plugin) Name() string {
func (p *Plugin) Collects() []interface{} {
return []interface{}{
- p.CollectBroker,
+ p.CollectSubscriber,
}
}
-func (p *Plugin) CollectBroker(name endure.Named, broker Subscriber) {
- p.broker = broker
+func (p *Plugin) CollectSubscriber(name endure.Named, subscriber Subscriber) {
+ p.broker = subscriber
+}
+
+func (p *Plugin) CollectStorage(name endure.Named, storage Storage) {
+ p.driver = storage
}
func (p *Plugin) RPC() interface{} {
diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go
index 65a229e1..556d5f03 100644
--- a/plugins/broadcast/redis/driver.go
+++ b/plugins/broadcast/redis/driver.go
@@ -1 +1,29 @@
package redis
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+)
+
+type Driver struct {
+}
+
+func NewInMemoryDriver() broadcast.Storage {
+ b := &Driver{}
+ return b
+}
+
+func (d *Driver) Store(uuid string, topics ...string) {
+ panic("implement me")
+}
+
+func (d *Driver) StorePattern(uuid string, pattern string) {
+ panic("implement me")
+}
+
+func (d *Driver) GetConnection(pattern string) []string {
+ panic("implement me")
+}
+
+func (d *Driver) Construct(key string) (broadcast.Storage, error) {
+ panic("implement me")
+}
diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go
new file mode 100644
index 00000000..cdff10da
--- /dev/null
+++ b/plugins/broadcast/ws/commands/leave.go
@@ -0,0 +1 @@
+package commands
diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go
new file mode 100644
index 00000000..cdff10da
--- /dev/null
+++ b/plugins/broadcast/ws/commands/subscribe.go
@@ -0,0 +1 @@
+package commands
diff --git a/plugins/broadcast/ws/connection.go b/plugins/broadcast/ws/connection/connection.go
index 9f7bf00e..cfb47e35 100644
--- a/plugins/broadcast/ws/connection.go
+++ b/plugins/broadcast/ws/connection/connection.go
@@ -1,4 +1,4 @@
-package ws
+package connection
import (
"sync"
diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go
index c9a97606..f075864b 100644
--- a/plugins/broadcast/ws/plugin.go
+++ b/plugins/broadcast/ws/plugin.go
@@ -8,10 +8,8 @@ import (
)
const (
- //
RootPluginName = "broadcast"
- //
- PluginName = "websockets"
+ PluginName = "websockets"
)
type Plugin struct {
@@ -21,7 +19,6 @@ type Plugin struct {
cfg config.Configurer
}
-
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("ws_plugin_init")
@@ -36,20 +33,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (p *Plugin) Serve() chan error {
- errCh := make(chan error)
-
- return errCh
-}
-
-func (p *Plugin) Stop() error {
- return nil
-}
-
func (p *Plugin) Name() string {
return PluginName
}
+// Provides Provide a ws implementation
func (p *Plugin) Provides() []interface{} {
return []interface{}{
p.Websocket,
@@ -57,9 +45,10 @@ func (p *Plugin) Provides() []interface{} {
}
// Websocket method should provide the Subscriber implementation to the broadcast
-func (p *Plugin) Websocket() (broadcast.Subscriber, error) {
+func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) {
const op = errors.Op("websocket_subscriber_provide")
- ws, err := NewWSSubscriber()
+ // initialize subscriber with the storage
+ ws, err := NewWSSubscriber(storage)
if err != nil {
return nil, errors.E(op, err)
}
@@ -67,6 +56,4 @@ func (p *Plugin) Websocket() (broadcast.Subscriber, error) {
return ws, nil
}
-
-
-func (p *Plugin) Available(){}
+func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go
index 2039cf95..660efdca 100644
--- a/plugins/broadcast/ws/subscriber.go
+++ b/plugins/broadcast/ws/subscriber.go
@@ -1,35 +1,50 @@
package ws
-import "github.com/spiral/roadrunner/v2/plugins/broadcast"
+import (
+ "github.com/gofiber/fiber/v2"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection"
+)
type Subscriber struct {
- connections map[string]*Connection
- storage broadcast.Storage
+ connections map[string]*connection.Connection
+ storage broadcast.Storage
}
-func NewWSSubscriber() (broadcast.Subscriber, error) {
- m := make(map[string]*Connection)
+// config
+//
+func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) {
+ m := make(map[string]*connection.Connection)
+
+ go func() {
+ app := fiber.New()
+ app.Use("/ws", wsMiddleware)
+ app.Listen(":8080")
+ }()
+
return &Subscriber{
connections: m,
+ storage: storage,
}, nil
}
-func (s *Subscriber) Subscribe(upstream chan *broadcast.Message, topics ...string) error {
+func (s *Subscriber) Subscribe(topics ...string) error {
panic("implement me")
-
-
-
-
}
-func (s *Subscriber) SubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+func (s *Subscriber) SubscribePattern(pattern string) error {
panic("implement me")
}
-func (s *Subscriber) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error {
+func (s *Subscriber) Unsubscribe(topics ...string) error {
panic("implement me")
}
-func (s *Subscriber) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+func (s *Subscriber) UnsubscribePattern(pattern string) error {
panic("implement me")
}
+
+func (s *Subscriber) Publish(messages ...*broadcast.Message) error {
+ s.storage.GetConnection(messages[9].Topic)
+ return nil
+}
diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go
new file mode 100644
index 00000000..068ef9fb
--- /dev/null
+++ b/plugins/broadcast/ws/ws_middleware.go
@@ -0,0 +1,13 @@
+package ws
+
+import (
+ "github.com/gofiber/fiber/v2"
+ "github.com/gofiber/websocket/v2"
+)
+
+func wsMiddleware(c *fiber.Ctx) error {
+ if websocket.IsWebSocketUpgrade(c) {
+ return c.Next()
+ }
+ return fiber.ErrUpgradeRequired
+}