diff options
author | Valery Piashchynski <[email protected]> | 2021-06-01 14:57:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-01 14:57:33 +0300 |
commit | 352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (patch) | |
tree | d940de0ee304d3edb60daa35568c3f186dc6a8b5 /plugins/websockets/plugin.go | |
parent | 548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff) |
- Initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r-- | plugins/websockets/plugin.go | 49 |
1 files changed, 29 insertions, 20 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 9b21ff8f..16cde0cc 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -15,6 +15,7 @@ import ( phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -25,6 +26,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -284,39 +286,46 @@ func (p *Plugin) Reset() error { } // Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(msg []*pubsub.Message) error { +func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - if br, ok := p.pubsubs[msg[i].Broker]; ok { - err := br.Publish(msg) + // Get payload + fbsMsg := message.GetRootAsMessages(m, 0) + tmpMsg := &message.Message{} + + for i := 0; i < fbsMsg.MessagesLength(); i++ { + fbsMsg.Messages(tmpMsg, i) + + for j := 0; j < tmpMsg.TopicsLength(); j++ { + if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok { + table := tmpMsg.Table() + err := br.Publish(table.ByteVector(0)) if err != nil { return errors.E(err) } } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker) + p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker()) } } } return nil } -func (p *Plugin) PublishAsync(msg []*pubsub.Message) { - go func() { - p.Lock() - defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - err := p.pubsubs[msg[i].Broker].Publish(msg) - if err != nil { - p.log.Error("publish async error", "error", err) - return - } - } - } - }() +func (p *Plugin) PublishAsync(msg []byte) { + //go func() { + // p.Lock() + // defer p.Unlock() + // for i := 0; i < len(msg); i++ { + // for j := 0; j < len(msg[i].Topics); j++ { + // err := p.pubsubs[msg[i].Broker].Publish(msg) + // if err != nil { + // p.log.Error("publish async error", "error", err) + // return + // } + // } + // } + //}() } func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { |