summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-01 14:57:33 +0300
committerValery Piashchynski <[email protected]>2021-06-01 14:57:33 +0300
commit352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (patch)
treed940de0ee304d3edb60daa35568c3f186dc6a8b5 /plugins/websockets/plugin.go
parent548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff)
- Initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go49
1 files changed, 29 insertions, 20 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 9b21ff8f..16cde0cc 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -15,6 +15,7 @@ import (
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -25,6 +26,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -284,39 +286,46 @@ func (p *Plugin) Reset() error {
}
// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(msg []*pubsub.Message) error {
+func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- if br, ok := p.pubsubs[msg[i].Broker]; ok {
- err := br.Publish(msg)
+ // Get payload
+ fbsMsg := message.GetRootAsMessages(m, 0)
+ tmpMsg := &message.Message{}
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ fbsMsg.Messages(tmpMsg, i)
+
+ for j := 0; j < tmpMsg.TopicsLength(); j++ {
+ if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok {
+ table := tmpMsg.Table()
+ err := br.Publish(table.ByteVector(0))
if err != nil {
return errors.E(err)
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker)
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker())
}
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
- go func() {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- err := p.pubsubs[msg[i].Broker].Publish(msg)
- if err != nil {
- p.log.Error("publish async error", "error", err)
- return
- }
- }
- }
- }()
+func (p *Plugin) PublishAsync(msg []byte) {
+ //go func() {
+ // p.Lock()
+ // defer p.Unlock()
+ // for i := 0; i < len(msg); i++ {
+ // for j := 0; j < len(msg[i].Topics); j++ {
+ // err := p.pubsubs[msg[i].Broker].Publish(msg)
+ // if err != nil {
+ // p.log.Error("publish async error", "error", err)
+ // return
+ // }
+ // }
+ // }
+ //}()
}
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {