summaryrefslogtreecommitdiff
path: root/plugins/broadcast
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-18 15:36:06 +0300
committerValery Piashchynski <[email protected]>2021-05-18 15:36:06 +0300
commitf85172106b4723b705aa75c3c310e8cebd050a8d (patch)
treeaa98200d5d01776d4328c31c79b8af52986a4375 /plugins/broadcast
parent60a3793f83c672e24283c009f53e780a4932be50 (diff)
- Add protected connection
- Update arch diagram - Update interfaces Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast')
-rw-r--r--plugins/broadcast/config.go16
-rw-r--r--plugins/broadcast/interface.go3
-rw-r--r--plugins/broadcast/ws/commands/join.go10
-rw-r--r--plugins/broadcast/ws/config.go25
-rw-r--r--plugins/broadcast/ws/connection.go69
-rw-r--r--plugins/broadcast/ws/plugin.go71
-rw-r--r--plugins/broadcast/ws/subscriber.go34
7 files changed, 227 insertions, 1 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
index 92f7951d..03bf6510 100644
--- a/plugins/broadcast/config.go
+++ b/plugins/broadcast/config.go
@@ -1,6 +1,22 @@
package broadcast
+/*
+broadcast:
+ ws-us-region-1:
+ subscriber: ws
+ path: "/ws"
+
+ driver: redis
+ address:
+ - 6379
+ db: 0
+*/
+
+
+// Config represents configuration for the ws plugin
type Config struct {
+ // Sections represent particular broadcast plugin section
+ Sections map[string]interface{} `mapstructure:"sections"`
}
func (c *Config) InitDefaults() {
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
index 716b3aac..3ed8b412 100644
--- a/plugins/broadcast/interface.go
+++ b/plugins/broadcast/interface.go
@@ -18,7 +18,8 @@ type Subscriber interface {
}
type Storage interface {
- Store()
+ Store(topics ...string)
+ StorePattern(pattern string)
}
type Publisher interface {
diff --git a/plugins/broadcast/ws/commands/join.go b/plugins/broadcast/ws/commands/join.go
new file mode 100644
index 00000000..25943f0a
--- /dev/null
+++ b/plugins/broadcast/ws/commands/join.go
@@ -0,0 +1,10 @@
+package commands
+
+// Join command to save the connection
+type Join struct {
+ Command string `mapstructure:"command"`
+}
+
+func JoinCommand() {
+
+}
diff --git a/plugins/broadcast/ws/config.go b/plugins/broadcast/ws/config.go
index 98592950..1d4132b4 100644
--- a/plugins/broadcast/ws/config.go
+++ b/plugins/broadcast/ws/config.go
@@ -1 +1,26 @@
package ws
+
+/*
+broadcast:
+ ws-us-region-1:
+ subscriber: ws
+ path: "/ws"
+
+ driver: redis
+ address:
+ - 6379
+ db: 0
+*/
+
+// Config represents configuration for the ws plugin
+type Config struct {
+ // http path for the websocket
+ Path string `mapstructure:"Path"`
+}
+
+// InitDefault initialize default values for the ws config
+func (c *Config) InitDefault() {
+ if c.Path == "" {
+ c.Path = "/ws"
+ }
+}
diff --git a/plugins/broadcast/ws/connection.go b/plugins/broadcast/ws/connection.go
new file mode 100644
index 00000000..9f7bf00e
--- /dev/null
+++ b/plugins/broadcast/ws/connection.go
@@ -0,0 +1,69 @@
+package ws
+
+import (
+ "sync"
+
+ "github.com/gofiber/websocket/v2"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// Connection represents wrapped and safe to use from the different threads websocket connection
+type Connection struct {
+ sync.RWMutex
+ log logger.Logger
+ conn *websocket.Conn
+}
+
+func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection {
+ return &Connection{
+ conn: wsConn,
+ log: log,
+ }
+}
+
+func (c *Connection) Write(mt int, data []byte) error {
+ c.Lock()
+ defer c.Unlock()
+
+ const op = errors.Op("websocket_write")
+ // handle a case when a goroutine tried to write into the closed connection
+ defer func() {
+ if r := recover(); r != nil {
+ c.log.Warn("panic handled, tried to write into the closed connection")
+ }
+ }()
+
+ err := c.conn.WriteMessage(mt, data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (c *Connection) Read() (int, []byte, error) {
+ c.RLock()
+ defer c.RUnlock()
+ const op = errors.Op("websocket_read")
+
+ mt, data, err := c.conn.ReadMessage()
+ if err != nil {
+ return -1, nil, errors.E(op, err)
+ }
+
+ return mt, data, nil
+}
+
+func (c *Connection) Close() error {
+ c.Lock()
+ defer c.Unlock()
+ const op = errors.Op("websocket_close")
+
+ err := c.conn.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go
index 98592950..c9a97606 100644
--- a/plugins/broadcast/ws/plugin.go
+++ b/plugins/broadcast/ws/plugin.go
@@ -1 +1,72 @@
package ws
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ //
+ RootPluginName = "broadcast"
+ //
+ PluginName = "websockets"
+)
+
+type Plugin struct {
+ // logger
+ log logger.Logger
+ // configurer plugin
+ cfg config.Configurer
+}
+
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("ws_plugin_init")
+
+ // check for the configuration section existence
+ if !cfg.Has(RootPluginName) {
+ return errors.E(op, errors.Disabled, errors.Str("broadcast plugin section should exists in the configuration"))
+ }
+
+ p.cfg = cfg
+ p.log = log
+
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Provides() []interface{} {
+ return []interface{}{
+ p.Websocket,
+ }
+}
+
+// Websocket method should provide the Subscriber implementation to the broadcast
+func (p *Plugin) Websocket() (broadcast.Subscriber, error) {
+ const op = errors.Op("websocket_subscriber_provide")
+ ws, err := NewWSSubscriber()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return ws, nil
+}
+
+
+
+func (p *Plugin) Available(){}
diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go
index 98592950..2039cf95 100644
--- a/plugins/broadcast/ws/subscriber.go
+++ b/plugins/broadcast/ws/subscriber.go
@@ -1 +1,35 @@
package ws
+
+import "github.com/spiral/roadrunner/v2/plugins/broadcast"
+
+type Subscriber struct {
+ connections map[string]*Connection
+ storage broadcast.Storage
+}
+
+func NewWSSubscriber() (broadcast.Subscriber, error) {
+ m := make(map[string]*Connection)
+ return &Subscriber{
+ connections: m,
+ }, nil
+}
+
+func (s *Subscriber) Subscribe(upstream chan *broadcast.Message, topics ...string) error {
+ panic("implement me")
+
+
+
+
+}
+
+func (s *Subscriber) SubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+ panic("implement me")
+}
+
+func (s *Subscriber) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error {
+ panic("implement me")
+}
+
+func (s *Subscriber) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+ panic("implement me")
+}