diff options
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r-- | plugins/websockets/plugin.go | 40 |
1 files changed, 27 insertions, 13 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 4c0edcad..6ddd609c 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -14,8 +14,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -23,9 +23,10 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" + "github.com/spiral/roadrunner/v2/plugins/websockets/memory" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" - "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/protobuf/proto" ) const ( @@ -79,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.serveExit = make(chan struct{}) p.server = server + // attach default driver + p.pubsubs["memory"] = memory.NewInMemory(p.log) + return nil } @@ -301,16 +305,21 @@ func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() + msg := &websocketsv1.Message{} + err := proto.Unmarshal(m, msg) + if err != nil { + return err + } + // Get payload - fbsMsg := message.GetRootAsMessage(m, 0) - for i := 0; i < fbsMsg.TopicsLength(); i++ { - if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { - err := br.Publish(fbsMsg.Table().Bytes) + for i := 0; i < len(msg.GetTopics()); i++ { + if br, ok := p.pubsubs[msg.GetBroker()]; ok { + err := br.Publish(m) if err != nil { return errors.E(err) } } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) + p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker()) } } return nil @@ -320,16 +329,21 @@ func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(m, 0) - for i := 0; i < fbsMsg.TopicsLength(); i++ { - if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { - err := br.Publish(fbsMsg.Table().Bytes) + msg := &websocketsv1.Message{} + err := proto.Unmarshal(m, msg) + if err != nil { + p.log.Error("message unmarshal") + } + + // Get payload + for i := 0; i < len(msg.GetTopics()); i++ { + if br, ok := p.pubsubs[msg.GetBroker()]; ok { + err := br.Publish(m) if err != nil { p.log.Error("publish async error", "error", err) - return } } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) + p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker()) } } }() |