diff options
Diffstat (limited to 'plugins/broadcast/ws')
-rw-r--r-- | plugins/broadcast/ws/commands/join.go | 10 | ||||
-rw-r--r-- | plugins/broadcast/ws/commands/leave.go | 1 | ||||
-rw-r--r-- | plugins/broadcast/ws/commands/subscribe.go | 1 | ||||
-rw-r--r-- | plugins/broadcast/ws/config.go | 26 | ||||
-rw-r--r-- | plugins/broadcast/ws/connection/connection.go | 69 | ||||
-rw-r--r-- | plugins/broadcast/ws/plugin.go | 59 | ||||
-rw-r--r-- | plugins/broadcast/ws/subscriber.go | 50 | ||||
-rw-r--r-- | plugins/broadcast/ws/ws_middleware.go | 13 |
8 files changed, 0 insertions, 229 deletions
diff --git a/plugins/broadcast/ws/commands/join.go b/plugins/broadcast/ws/commands/join.go deleted file mode 100644 index 25943f0a..00000000 --- a/plugins/broadcast/ws/commands/join.go +++ /dev/null @@ -1,10 +0,0 @@ -package commands - -// Join command to save the connection -type Join struct { - Command string `mapstructure:"command"` -} - -func JoinCommand() { - -} diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go deleted file mode 100644 index cdff10da..00000000 --- a/plugins/broadcast/ws/commands/leave.go +++ /dev/null @@ -1 +0,0 @@ -package commands diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go deleted file mode 100644 index cdff10da..00000000 --- a/plugins/broadcast/ws/commands/subscribe.go +++ /dev/null @@ -1 +0,0 @@ -package commands diff --git a/plugins/broadcast/ws/config.go b/plugins/broadcast/ws/config.go deleted file mode 100644 index 1d4132b4..00000000 --- a/plugins/broadcast/ws/config.go +++ /dev/null @@ -1,26 +0,0 @@ -package ws - -/* -broadcast: - ws-us-region-1: - subscriber: ws - path: "/ws" - - driver: redis - address: - - 6379 - db: 0 -*/ - -// Config represents configuration for the ws plugin -type Config struct { - // http path for the websocket - Path string `mapstructure:"Path"` -} - -// InitDefault initialize default values for the ws config -func (c *Config) InitDefault() { - if c.Path == "" { - c.Path = "/ws" - } -} diff --git a/plugins/broadcast/ws/connection/connection.go b/plugins/broadcast/ws/connection/connection.go deleted file mode 100644 index cfb47e35..00000000 --- a/plugins/broadcast/ws/connection/connection.go +++ /dev/null @@ -1,69 +0,0 @@ -package connection - -import ( - "sync" - - "github.com/gofiber/websocket/v2" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -// Connection represents wrapped and safe to use from the different threads websocket connection -type Connection struct { - sync.RWMutex - log logger.Logger - conn *websocket.Conn -} - -func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection { - return &Connection{ - conn: wsConn, - log: log, - } -} - -func (c *Connection) Write(mt int, data []byte) error { - c.Lock() - defer c.Unlock() - - const op = errors.Op("websocket_write") - // handle a case when a goroutine tried to write into the closed connection - defer func() { - if r := recover(); r != nil { - c.log.Warn("panic handled, tried to write into the closed connection") - } - }() - - err := c.conn.WriteMessage(mt, data) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (c *Connection) Read() (int, []byte, error) { - c.RLock() - defer c.RUnlock() - const op = errors.Op("websocket_read") - - mt, data, err := c.conn.ReadMessage() - if err != nil { - return -1, nil, errors.E(op, err) - } - - return mt, data, nil -} - -func (c *Connection) Close() error { - c.Lock() - defer c.Unlock() - const op = errors.Op("websocket_close") - - err := c.conn.Close() - if err != nil { - return errors.E(op, err) - } - - return nil -} diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go deleted file mode 100644 index f075864b..00000000 --- a/plugins/broadcast/ws/plugin.go +++ /dev/null @@ -1,59 +0,0 @@ -package ws - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - RootPluginName = "broadcast" - PluginName = "websockets" -) - -type Plugin struct { - // logger - log logger.Logger - // configurer plugin - cfg config.Configurer -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("ws_plugin_init") - - // check for the configuration section existence - if !cfg.Has(RootPluginName) { - return errors.E(op, errors.Disabled, errors.Str("broadcast plugin section should exists in the configuration")) - } - - p.cfg = cfg - p.log = log - - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -// Provides Provide a ws implementation -func (p *Plugin) Provides() []interface{} { - return []interface{}{ - p.Websocket, - } -} - -// Websocket method should provide the Subscriber implementation to the broadcast -func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) { - const op = errors.Op("websocket_subscriber_provide") - // initialize subscriber with the storage - ws, err := NewWSSubscriber(storage) - if err != nil { - return nil, errors.E(op, err) - } - - return ws, nil -} - -func (p *Plugin) Available() {} diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go deleted file mode 100644 index 660efdca..00000000 --- a/plugins/broadcast/ws/subscriber.go +++ /dev/null @@ -1,50 +0,0 @@ -package ws - -import ( - "github.com/gofiber/fiber/v2" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection" -) - -type Subscriber struct { - connections map[string]*connection.Connection - storage broadcast.Storage -} - -// config -// -func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) { - m := make(map[string]*connection.Connection) - - go func() { - app := fiber.New() - app.Use("/ws", wsMiddleware) - app.Listen(":8080") - }() - - return &Subscriber{ - connections: m, - storage: storage, - }, nil -} - -func (s *Subscriber) Subscribe(topics ...string) error { - panic("implement me") -} - -func (s *Subscriber) SubscribePattern(pattern string) error { - panic("implement me") -} - -func (s *Subscriber) Unsubscribe(topics ...string) error { - panic("implement me") -} - -func (s *Subscriber) UnsubscribePattern(pattern string) error { - panic("implement me") -} - -func (s *Subscriber) Publish(messages ...*broadcast.Message) error { - s.storage.GetConnection(messages[9].Topic) - return nil -} diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go deleted file mode 100644 index 068ef9fb..00000000 --- a/plugins/broadcast/ws/ws_middleware.go +++ /dev/null @@ -1,13 +0,0 @@ -package ws - -import ( - "github.com/gofiber/fiber/v2" - "github.com/gofiber/websocket/v2" -) - -func wsMiddleware(c *fiber.Ctx) error { - if websocket.IsWebSocketUpgrade(c) { - return c.Next() - } - return fiber.ErrUpgradeRequired -} |