diff options
Diffstat (limited to 'plugins/broadcast')
-rw-r--r-- | plugins/broadcast/config.go | 9 | ||||
-rw-r--r-- | plugins/broadcast/doc/ws.drawio | 1 | ||||
-rw-r--r-- | plugins/broadcast/interface.go | 28 | ||||
-rw-r--r-- | plugins/broadcast/memory/bst/bst.go | 134 | ||||
-rw-r--r-- | plugins/broadcast/memory/bst/bst_test.go | 33 | ||||
-rw-r--r-- | plugins/broadcast/memory/bst/interface.go | 11 | ||||
-rw-r--r-- | plugins/broadcast/memory/driver.go | 28 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 16 | ||||
-rw-r--r-- | plugins/broadcast/redis/driver.go | 28 | ||||
-rw-r--r-- | plugins/broadcast/ws/commands/leave.go | 1 | ||||
-rw-r--r-- | plugins/broadcast/ws/commands/subscribe.go | 1 | ||||
-rw-r--r-- | plugins/broadcast/ws/connection/connection.go (renamed from plugins/broadcast/ws/connection.go) | 2 | ||||
-rw-r--r-- | plugins/broadcast/ws/plugin.go | 25 | ||||
-rw-r--r-- | plugins/broadcast/ws/subscriber.go | 41 | ||||
-rw-r--r-- | plugins/broadcast/ws/ws_middleware.go | 13 |
15 files changed, 300 insertions, 71 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go index 03bf6510..5e7b7f20 100644 --- a/plugins/broadcast/config.go +++ b/plugins/broadcast/config.go @@ -3,16 +3,17 @@ package broadcast /* broadcast: ws-us-region-1: - subscriber: ws - path: "/ws" + subscriber: websockets + middleware: ["headers", "gzip"] # ???? + address: "localhost:53223" + path: "/ws" - driver: redis + storage: redis address: - 6379 db: 0 */ - // Config represents configuration for the ws plugin type Config struct { // Sections represent particular broadcast plugin section diff --git a/plugins/broadcast/doc/ws.drawio b/plugins/broadcast/doc/ws.drawio new file mode 100644 index 00000000..739b797a --- /dev/null +++ b/plugins/broadcast/doc/ws.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go index 3ed8b412..47c779b5 100644 --- a/plugins/broadcast/interface.go +++ b/plugins/broadcast/interface.go @@ -1,25 +1,29 @@ package broadcast -import "encoding/json" +import ( + "encoding/json" +) // Subscriber defines the ability to operate as message passing broker. type Subscriber interface { // Subscribe broker to one or multiple topics. - Subscribe(upstream chan *Message, topics ...string) error - - // SubscribePattern broker to pattern. - SubscribePattern(upstream chan *Message, pattern string) error - - // Unsubscribe broker from one or multiple topics. - Unsubscribe(upstream chan *Message, topics ...string) error - + Subscribe(topics ...string) error // UnsubscribePattern broker from pattern. - UnsubscribePattern(upstream chan *Message, pattern string) error + UnsubscribePattern(pattern string) error } +// Storage used to store patterns and topics type Storage interface { - Store(topics ...string) - StorePattern(pattern string) + // Store connection uuid associated with the provided topics + Store(uuid string, topics ...string) + // StorePattern stores pattern associated with the particular connection + StorePattern(uuid string, pattern string) + + // GetConnection returns connections for the particular pattern + GetConnection(pattern string) []string + + // Construct is a constructor for the storage according to the provided configuration key (broadcast.websocket for example) + Construct(key string) (Storage, error) } type Publisher interface { diff --git a/plugins/broadcast/memory/bst/bst.go b/plugins/broadcast/memory/bst/bst.go new file mode 100644 index 00000000..7d09a10f --- /dev/null +++ b/plugins/broadcast/memory/bst/bst.go @@ -0,0 +1,134 @@ +package bst + +// BST ... +type BST struct { + // registered topic, not unique + topic string + // associated connections with the topic + uuids map[string]struct{} + + // left and right subtrees + left *BST + right *BST +} + +func NewBST() Storage { + return &BST{} +} + +// Insert uuid to the topic +func (B *BST) Insert(uuid string, topic string) { + curr := B + + for { + if curr.topic == topic { + curr.uuids[uuid] = struct{}{} + return + } + // if topic less than curr topic + if curr.topic < topic { + if curr.left == nil { + curr.left = &BST{ + topic: topic, + uuids: map[string]struct{}{uuid: {}}, + } + return + } + // move forward + curr = curr.left + } else { + if curr.right == nil { + curr.right = &BST{ + topic: topic, + uuids: map[string]struct{}{uuid: {}}, + } + return + } + + curr = curr.right + } + } +} + +func (B *BST) Get(topic string) map[string]struct{} { + curr := B + for curr != nil { + if curr.topic == topic { + return curr.uuids + } + if curr.topic < topic { + curr = curr.left + } + if curr.topic > topic { + curr = curr.right + } + } + + return nil +} + +func (B *BST) Remove(uuid string, topic string) { + B.removeHelper(uuid, topic, nil) +} + +func (B *BST) removeHelper(uuid string, topic string, parent *BST) { + curr := B + for curr != nil { + if topic < curr.topic { + parent = curr + curr = curr.left + } else if topic > curr.topic { + parent = curr + curr = curr.right + } else { + if len(curr.uuids) > 1 { + if _, ok := curr.uuids[uuid]; ok { + delete(curr.uuids, uuid) + return + } + } + + if curr.left != nil && curr.right != nil { + curr.topic, curr.uuids = curr.right.traverseForMinString() + curr.right.removeHelper(curr.topic, uuid, curr) + } else if parent == nil { + if curr.left != nil { + curr.topic = curr.left.topic + curr.uuids = curr.left.uuids + + curr.right = curr.left.right + curr.left = curr.left.left + } else if curr.right != nil { + curr.topic = curr.right.topic + curr.uuids = curr.right.uuids + + curr.left = curr.right.left + curr.right = curr.right.right + } else { + // single node tree + } + } else if parent.left == curr { + if curr.left != nil { + parent.left = curr.left + } else { + parent.left = curr.right + } + } else if parent.right == curr { + if curr.left != nil { + parent.right = curr.left + } else { + parent.right = curr.right + } + } + break + } + } +} + +//go:inline +func (B *BST) traverseForMinString() (string, map[string]struct{}) { + if B.left == nil { + return B.topic, B.uuids + } + return B.left.traverseForMinString() +} diff --git a/plugins/broadcast/memory/bst/bst_test.go b/plugins/broadcast/memory/bst/bst_test.go new file mode 100644 index 00000000..b5ad6c10 --- /dev/null +++ b/plugins/broadcast/memory/bst/bst_test.go @@ -0,0 +1,33 @@ +package bst + +import ( + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestNewBST(t *testing.T) { + g := NewBST() + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments") + } + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments2") + } + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments3") + } + + exist := g.Get("comments") + assert.Len(t, exist, 100) + + exist2 := g.Get("comments2") + assert.Len(t, exist2, 100) + + exist3 := g.Get("comments3") + assert.Len(t, exist3, 100) +} diff --git a/plugins/broadcast/memory/bst/interface.go b/plugins/broadcast/memory/bst/interface.go new file mode 100644 index 00000000..ecf40414 --- /dev/null +++ b/plugins/broadcast/memory/bst/interface.go @@ -0,0 +1,11 @@ +package bst + +// Storage is general in-memory BST storage implementation +type Storage interface { + // Insert inserts to a vertex with topic ident connection uuid + Insert(uuid string, topic string) + // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed + Remove(uuid, topic string) + // Get will return all connections associated with the topic + Get(topic string) map[string]struct{} +} diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go index 2eb45c8e..80527e4b 100644 --- a/plugins/broadcast/memory/driver.go +++ b/plugins/broadcast/memory/driver.go @@ -1,39 +1,29 @@ package memory -import "github.com/spiral/roadrunner/v2/plugins/broadcast" +import ( + "github.com/spiral/roadrunner/v2/plugins/broadcast" +) type Driver struct { } -func NewInMemoryDriver() broadcast.Subscriber { +func NewInMemoryDriver() broadcast.Storage { b := &Driver{} return b } -func (d *Driver) Serve() error { +func (d *Driver) Store(uuid string, topics ...string) { panic("implement me") } -func (d *Driver) Stop() { +func (d *Driver) StorePattern(uuid string, pattern string) { panic("implement me") } -func (d *Driver) Subscribe(upstream chan *broadcast.Message, topics ...string) error { +func (d *Driver) GetConnection(pattern string) []string { panic("implement me") } -func (d *Driver) SubscribePattern(upstream chan *broadcast.Message, pattern string) error { - panic("implement me") -} - -func (d *Driver) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error { - panic("implement me") -} - -func (d *Driver) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error { - panic("implement me") -} - -func (d *Driver) Publish(messages ...*broadcast.Message) error { - panic("implement me") +func (d *Driver) Construct(key string) (broadcast.Storage, error) { + return nil, nil } diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 7ad9e2ae..156bea80 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -47,6 +47,12 @@ func (p *Plugin) Serve() chan error { return errCh } + if p.driver == nil { + // Or if no storage detected, use in-memory storage + errCh <- errors.E(op, errors.Str("no storage detected")) + return errCh + } + // start the underlying broker go func() { // err := p.broker.Serve() @@ -72,12 +78,16 @@ func (p *Plugin) Name() string { func (p *Plugin) Collects() []interface{} { return []interface{}{ - p.CollectBroker, + p.CollectSubscriber, } } -func (p *Plugin) CollectBroker(name endure.Named, broker Subscriber) { - p.broker = broker +func (p *Plugin) CollectSubscriber(name endure.Named, subscriber Subscriber) { + p.broker = subscriber +} + +func (p *Plugin) CollectStorage(name endure.Named, storage Storage) { + p.driver = storage } func (p *Plugin) RPC() interface{} { diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go index 65a229e1..556d5f03 100644 --- a/plugins/broadcast/redis/driver.go +++ b/plugins/broadcast/redis/driver.go @@ -1 +1,29 @@ package redis + +import ( + "github.com/spiral/roadrunner/v2/plugins/broadcast" +) + +type Driver struct { +} + +func NewInMemoryDriver() broadcast.Storage { + b := &Driver{} + return b +} + +func (d *Driver) Store(uuid string, topics ...string) { + panic("implement me") +} + +func (d *Driver) StorePattern(uuid string, pattern string) { + panic("implement me") +} + +func (d *Driver) GetConnection(pattern string) []string { + panic("implement me") +} + +func (d *Driver) Construct(key string) (broadcast.Storage, error) { + panic("implement me") +} diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go new file mode 100644 index 00000000..cdff10da --- /dev/null +++ b/plugins/broadcast/ws/commands/leave.go @@ -0,0 +1 @@ +package commands diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go new file mode 100644 index 00000000..cdff10da --- /dev/null +++ b/plugins/broadcast/ws/commands/subscribe.go @@ -0,0 +1 @@ +package commands diff --git a/plugins/broadcast/ws/connection.go b/plugins/broadcast/ws/connection/connection.go index 9f7bf00e..cfb47e35 100644 --- a/plugins/broadcast/ws/connection.go +++ b/plugins/broadcast/ws/connection/connection.go @@ -1,4 +1,4 @@ -package ws +package connection import ( "sync" diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go index c9a97606..f075864b 100644 --- a/plugins/broadcast/ws/plugin.go +++ b/plugins/broadcast/ws/plugin.go @@ -8,10 +8,8 @@ import ( ) const ( - // RootPluginName = "broadcast" - // - PluginName = "websockets" + PluginName = "websockets" ) type Plugin struct { @@ -21,7 +19,6 @@ type Plugin struct { cfg config.Configurer } - func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("ws_plugin_init") @@ -36,20 +33,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return nil } -func (p *Plugin) Serve() chan error { - errCh := make(chan error) - - return errCh -} - -func (p *Plugin) Stop() error { - return nil -} - func (p *Plugin) Name() string { return PluginName } +// Provides Provide a ws implementation func (p *Plugin) Provides() []interface{} { return []interface{}{ p.Websocket, @@ -57,9 +45,10 @@ func (p *Plugin) Provides() []interface{} { } // Websocket method should provide the Subscriber implementation to the broadcast -func (p *Plugin) Websocket() (broadcast.Subscriber, error) { +func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) { const op = errors.Op("websocket_subscriber_provide") - ws, err := NewWSSubscriber() + // initialize subscriber with the storage + ws, err := NewWSSubscriber(storage) if err != nil { return nil, errors.E(op, err) } @@ -67,6 +56,4 @@ func (p *Plugin) Websocket() (broadcast.Subscriber, error) { return ws, nil } - - -func (p *Plugin) Available(){} +func (p *Plugin) Available() {} diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go index 2039cf95..660efdca 100644 --- a/plugins/broadcast/ws/subscriber.go +++ b/plugins/broadcast/ws/subscriber.go @@ -1,35 +1,50 @@ package ws -import "github.com/spiral/roadrunner/v2/plugins/broadcast" +import ( + "github.com/gofiber/fiber/v2" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection" +) type Subscriber struct { - connections map[string]*Connection - storage broadcast.Storage + connections map[string]*connection.Connection + storage broadcast.Storage } -func NewWSSubscriber() (broadcast.Subscriber, error) { - m := make(map[string]*Connection) +// config +// +func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) { + m := make(map[string]*connection.Connection) + + go func() { + app := fiber.New() + app.Use("/ws", wsMiddleware) + app.Listen(":8080") + }() + return &Subscriber{ connections: m, + storage: storage, }, nil } -func (s *Subscriber) Subscribe(upstream chan *broadcast.Message, topics ...string) error { +func (s *Subscriber) Subscribe(topics ...string) error { panic("implement me") - - - - } -func (s *Subscriber) SubscribePattern(upstream chan *broadcast.Message, pattern string) error { +func (s *Subscriber) SubscribePattern(pattern string) error { panic("implement me") } -func (s *Subscriber) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error { +func (s *Subscriber) Unsubscribe(topics ...string) error { panic("implement me") } -func (s *Subscriber) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error { +func (s *Subscriber) UnsubscribePattern(pattern string) error { panic("implement me") } + +func (s *Subscriber) Publish(messages ...*broadcast.Message) error { + s.storage.GetConnection(messages[9].Topic) + return nil +} diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go new file mode 100644 index 00000000..068ef9fb --- /dev/null +++ b/plugins/broadcast/ws/ws_middleware.go @@ -0,0 +1,13 @@ +package ws + +import ( + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" +) + +func wsMiddleware(c *fiber.Ctx) error { + if websocket.IsWebSocketUpgrade(c) { + return c.Next() + } + return fiber.ErrUpgradeRequired +} |