summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-27 13:26:12 +0300
committerValery Piashchynski <[email protected]>2021-05-27 13:26:12 +0300
commit0a9aea326045e56716f0736f7aa8520305362c51 (patch)
tree532ca326690d81e97136248dd798d23a56843278 /plugins/websockets/plugin.go
parent57a30c2b49c36161b3af3e539a8618c2d39a5cc9 (diff)
- Move bst to the pkg folder
- Add comments - Fix all golangci-lint warnings Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go41
1 files changed, 19 insertions, 22 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index a247da69..bc5028e6 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -58,28 +58,25 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
func (p *Plugin) Serve() chan error {
errCh := make(chan error)
- go func() {
- ps := p.pubsubs["redis"]
-
- for {
- // get message
- // get topic
- // get connection uuid from the storage by the topic
- // write payload into the connection
- // do that in the workers pool
- data, err := ps.Next()
- if err != nil {
- errCh <- err
- return
- }
- if data == nil {
- continue
- }
+ // run all pubsubs drivers
+ for _, v := range p.pubsubs {
+ go func(ps pubsub.PubSub) {
+ for {
+ data, err := ps.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- p.workersPool.Queue(data)
- }
- }()
+ if data == nil {
+ continue
+ }
+
+ p.workersPool.Queue(data)
+ }
+ }(v)
+ }
return errCh
}
@@ -156,7 +153,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
// Executor wraps a connection to have a safe abstraction
p.Lock()
- e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs["redis"])
+ e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs)
p.Unlock()
p.log.Info("websocket client connected", "uuid", connectionID)
@@ -169,6 +166,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
})
}
+// Publish is an entry point to the websocket PUBSUB
func (p *Plugin) Publish(msg []pubsub.Message) error {
p.Lock()
defer p.Unlock()
@@ -199,7 +197,6 @@ func (p *Plugin) PublishAsync(msg []pubsub.Message) {
p.log.Error("publish async error", "error", err)
return
}
-
}
}
}()