summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/connection/connection.go2
-rw-r--r--plugins/websockets/executor/executor.go33
-rw-r--r--plugins/websockets/plugin.go41
-rw-r--r--plugins/websockets/storage/storage.go2
4 files changed, 24 insertions, 54 deletions
diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go
index 5eb30c61..2b847173 100644
--- a/plugins/websockets/connection/connection.go
+++ b/plugins/websockets/connection/connection.go
@@ -43,8 +43,6 @@ func (c *Connection) Write(mt int, data []byte) error {
}
func (c *Connection) Read() (int, []byte, error) {
- //c.RLock()
- //defer c.RUnlock()
const op = errors.Op("websocket_read")
mt, data, err := c.conn.ReadMessage()
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 391c9a8c..9ef5e40a 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -22,11 +22,11 @@ type Executor struct {
// associated connection ID
connID string
- pubsub pubsub.PubSub
+ pubsub map[string]pubsub.PubSub
}
// NewExecutor creates protected connection and starts command loop
-func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor {
+func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs map[string]pubsub.PubSub) *Executor {
return &Executor{
conn: conn,
connID: connID,
@@ -58,31 +58,6 @@ func (e *Executor) StartCommandLoop() error {
switch msg.Command() {
// handle leave
case commands.Join:
- // TODO access validators model update
- //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...)
- //// validation error
- //if err != nil {
- // e.log.Error("validation error", "error", err)
- //
- // resp := &Response{
- // Topic: "#join",
- // Payload: msg.Topics(),
- // }
- //
- // packet, err := json.Marshal(resp)
- // if err != nil {
- // e.log.Error("error marshal the body", "error", err)
- // return err
- // }
- //
- // err = e.conn.Write(websocket.BinaryMessage, packet)
- // if err != nil {
- // e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
- // continue
- // }
- //
- // continue
- //}
// associate connection with topics
e.storage.Store(e.connID, msg.Topics())
@@ -103,7 +78,7 @@ func (e *Executor) StartCommandLoop() error {
continue
}
- err = e.pubsub.Subscribe(msg.Topics()...)
+ err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
continue
@@ -125,7 +100,7 @@ func (e *Executor) StartCommandLoop() error {
continue
}
- err = e.pubsub.Unsubscribe(msg.Topics()...)
+ err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
continue
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index a247da69..bc5028e6 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -58,28 +58,25 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
func (p *Plugin) Serve() chan error {
errCh := make(chan error)
- go func() {
- ps := p.pubsubs["redis"]
-
- for {
- // get message
- // get topic
- // get connection uuid from the storage by the topic
- // write payload into the connection
- // do that in the workers pool
- data, err := ps.Next()
- if err != nil {
- errCh <- err
- return
- }
- if data == nil {
- continue
- }
+ // run all pubsubs drivers
+ for _, v := range p.pubsubs {
+ go func(ps pubsub.PubSub) {
+ for {
+ data, err := ps.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- p.workersPool.Queue(data)
- }
- }()
+ if data == nil {
+ continue
+ }
+
+ p.workersPool.Queue(data)
+ }
+ }(v)
+ }
return errCh
}
@@ -156,7 +153,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
// Executor wraps a connection to have a safe abstraction
p.Lock()
- e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs["redis"])
+ e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs)
p.Unlock()
p.log.Info("websocket client connected", "uuid", connectionID)
@@ -169,6 +166,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
})
}
+// Publish is an entry point to the websocket PUBSUB
func (p *Plugin) Publish(msg []pubsub.Message) error {
p.Lock()
defer p.Unlock()
@@ -199,7 +197,6 @@ func (p *Plugin) PublishAsync(msg []pubsub.Message) {
p.log.Error("publish async error", "error", err)
return
}
-
}
}
}()
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
index a7e49207..34f53cfd 100644
--- a/plugins/websockets/storage/storage.go
+++ b/plugins/websockets/storage/storage.go
@@ -3,7 +3,7 @@ package storage
import (
"sync"
- "github.com/spiral/roadrunner/v2/plugins/memory/bst"
+ "github.com/spiral/roadrunner/v2/pkg/bst"
)
type Storage struct {