summaryrefslogtreecommitdiff
path: root/plugins/broadcast/ws
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast/ws')
-rw-r--r--plugins/broadcast/ws/commands/join.go10
-rw-r--r--plugins/broadcast/ws/commands/leave.go1
-rw-r--r--plugins/broadcast/ws/commands/subscribe.go1
-rw-r--r--plugins/broadcast/ws/config.go26
-rw-r--r--plugins/broadcast/ws/connection/connection.go69
-rw-r--r--plugins/broadcast/ws/plugin.go59
-rw-r--r--plugins/broadcast/ws/subscriber.go50
-rw-r--r--plugins/broadcast/ws/ws_middleware.go13
8 files changed, 0 insertions, 229 deletions
diff --git a/plugins/broadcast/ws/commands/join.go b/plugins/broadcast/ws/commands/join.go
deleted file mode 100644
index 25943f0a..00000000
--- a/plugins/broadcast/ws/commands/join.go
+++ /dev/null
@@ -1,10 +0,0 @@
-package commands
-
-// Join command to save the connection
-type Join struct {
- Command string `mapstructure:"command"`
-}
-
-func JoinCommand() {
-
-}
diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go
deleted file mode 100644
index cdff10da..00000000
--- a/plugins/broadcast/ws/commands/leave.go
+++ /dev/null
@@ -1 +0,0 @@
-package commands
diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go
deleted file mode 100644
index cdff10da..00000000
--- a/plugins/broadcast/ws/commands/subscribe.go
+++ /dev/null
@@ -1 +0,0 @@
-package commands
diff --git a/plugins/broadcast/ws/config.go b/plugins/broadcast/ws/config.go
deleted file mode 100644
index 1d4132b4..00000000
--- a/plugins/broadcast/ws/config.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package ws
-
-/*
-broadcast:
- ws-us-region-1:
- subscriber: ws
- path: "/ws"
-
- driver: redis
- address:
- - 6379
- db: 0
-*/
-
-// Config represents configuration for the ws plugin
-type Config struct {
- // http path for the websocket
- Path string `mapstructure:"Path"`
-}
-
-// InitDefault initialize default values for the ws config
-func (c *Config) InitDefault() {
- if c.Path == "" {
- c.Path = "/ws"
- }
-}
diff --git a/plugins/broadcast/ws/connection/connection.go b/plugins/broadcast/ws/connection/connection.go
deleted file mode 100644
index cfb47e35..00000000
--- a/plugins/broadcast/ws/connection/connection.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package connection
-
-import (
- "sync"
-
- "github.com/gofiber/websocket/v2"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-// Connection represents wrapped and safe to use from the different threads websocket connection
-type Connection struct {
- sync.RWMutex
- log logger.Logger
- conn *websocket.Conn
-}
-
-func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection {
- return &Connection{
- conn: wsConn,
- log: log,
- }
-}
-
-func (c *Connection) Write(mt int, data []byte) error {
- c.Lock()
- defer c.Unlock()
-
- const op = errors.Op("websocket_write")
- // handle a case when a goroutine tried to write into the closed connection
- defer func() {
- if r := recover(); r != nil {
- c.log.Warn("panic handled, tried to write into the closed connection")
- }
- }()
-
- err := c.conn.WriteMessage(mt, data)
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (c *Connection) Read() (int, []byte, error) {
- c.RLock()
- defer c.RUnlock()
- const op = errors.Op("websocket_read")
-
- mt, data, err := c.conn.ReadMessage()
- if err != nil {
- return -1, nil, errors.E(op, err)
- }
-
- return mt, data, nil
-}
-
-func (c *Connection) Close() error {
- c.Lock()
- defer c.Unlock()
- const op = errors.Op("websocket_close")
-
- err := c.conn.Close()
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go
deleted file mode 100644
index f075864b..00000000
--- a/plugins/broadcast/ws/plugin.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package ws
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- RootPluginName = "broadcast"
- PluginName = "websockets"
-)
-
-type Plugin struct {
- // logger
- log logger.Logger
- // configurer plugin
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("ws_plugin_init")
-
- // check for the configuration section existence
- if !cfg.Has(RootPluginName) {
- return errors.E(op, errors.Disabled, errors.Str("broadcast plugin section should exists in the configuration"))
- }
-
- p.cfg = cfg
- p.log = log
-
- return nil
-}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-// Provides Provide a ws implementation
-func (p *Plugin) Provides() []interface{} {
- return []interface{}{
- p.Websocket,
- }
-}
-
-// Websocket method should provide the Subscriber implementation to the broadcast
-func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) {
- const op = errors.Op("websocket_subscriber_provide")
- // initialize subscriber with the storage
- ws, err := NewWSSubscriber(storage)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return ws, nil
-}
-
-func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go
deleted file mode 100644
index 660efdca..00000000
--- a/plugins/broadcast/ws/subscriber.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package ws
-
-import (
- "github.com/gofiber/fiber/v2"
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
- "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection"
-)
-
-type Subscriber struct {
- connections map[string]*connection.Connection
- storage broadcast.Storage
-}
-
-// config
-//
-func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) {
- m := make(map[string]*connection.Connection)
-
- go func() {
- app := fiber.New()
- app.Use("/ws", wsMiddleware)
- app.Listen(":8080")
- }()
-
- return &Subscriber{
- connections: m,
- storage: storage,
- }, nil
-}
-
-func (s *Subscriber) Subscribe(topics ...string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) SubscribePattern(pattern string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) Unsubscribe(topics ...string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) UnsubscribePattern(pattern string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) Publish(messages ...*broadcast.Message) error {
- s.storage.GetConnection(messages[9].Topic)
- return nil
-}
diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go
deleted file mode 100644
index 068ef9fb..00000000
--- a/plugins/broadcast/ws/ws_middleware.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package ws
-
-import (
- "github.com/gofiber/fiber/v2"
- "github.com/gofiber/websocket/v2"
-)
-
-func wsMiddleware(c *fiber.Ctx) error {
- if websocket.IsWebSocketUpgrade(c) {
- return c.Next()
- }
- return fiber.ErrUpgradeRequired
-}