summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/plugin.go49
-rw-r--r--plugins/websockets/rpc.go7
-rw-r--r--plugins/websockets/schema/message.fbs10
-rw-r--r--plugins/websockets/schema/message/Message.go118
4 files changed, 32 insertions, 152 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 9b21ff8f..16cde0cc 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -15,6 +15,7 @@ import (
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -25,6 +26,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -284,39 +286,46 @@ func (p *Plugin) Reset() error {
}
// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(msg []*pubsub.Message) error {
+func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- if br, ok := p.pubsubs[msg[i].Broker]; ok {
- err := br.Publish(msg)
+ // Get payload
+ fbsMsg := message.GetRootAsMessages(m, 0)
+ tmpMsg := &message.Message{}
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ fbsMsg.Messages(tmpMsg, i)
+
+ for j := 0; j < tmpMsg.TopicsLength(); j++ {
+ if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok {
+ table := tmpMsg.Table()
+ err := br.Publish(table.ByteVector(0))
if err != nil {
return errors.E(err)
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker)
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker())
}
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
- go func() {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- err := p.pubsubs[msg[i].Broker].Publish(msg)
- if err != nil {
- p.log.Error("publish async error", "error", err)
- return
- }
- }
- }
- }()
+func (p *Plugin) PublishAsync(msg []byte) {
+ //go func() {
+ // p.Lock()
+ // defer p.Unlock()
+ // for i := 0; i < len(msg); i++ {
+ // for j := 0; j < len(msg[i].Topics); j++ {
+ // err := p.pubsubs[msg[i].Broker].Publish(msg)
+ // if err != nil {
+ // p.log.Error("publish async error", "error", err)
+ // return
+ // }
+ // }
+ // }
+ //}()
}
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 2fb0f1b9..d915aa43 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -2,7 +2,6 @@ package websockets
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -12,9 +11,9 @@ type rpc struct {
log logger.Logger
}
-func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error {
+func (r *rpc) Publish(msg []byte, ok *bool) error {
const op = errors.Op("broadcast_publish")
- r.log.Debug("message published", "msg", msg)
+ r.log.Debug("message published")
// just return in case of nil message
if msg == nil {
@@ -31,7 +30,7 @@ func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error {
return nil
}
-func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error {
+func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
r.log.Debug("message published", "msg", msg)
// just return in case of nil message
diff --git a/plugins/websockets/schema/message.fbs b/plugins/websockets/schema/message.fbs
deleted file mode 100644
index f2d92c78..00000000
--- a/plugins/websockets/schema/message.fbs
+++ /dev/null
@@ -1,10 +0,0 @@
-namespace message;
-
-table Message {
- command:string;
- broker:string;
- topics:[string];
- payload:[byte];
-}
-
-root_type Message;
diff --git a/plugins/websockets/schema/message/Message.go b/plugins/websockets/schema/message/Message.go
deleted file mode 100644
index 26bbd12c..00000000
--- a/plugins/websockets/schema/message/Message.go
+++ /dev/null
@@ -1,118 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package message
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Message struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Message{}
- x.Init(buf, n+offset)
- return x
-}
-
-func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
- n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
- x := &Message{}
- x.Init(buf, n+offset+flatbuffers.SizeUint32)
- return x
-}
-
-func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Message) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Message) Command() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Message) Broker() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Message) Topics(j int) []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
- if o != 0 {
- a := rcv._tab.Vector(o)
- return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4))
- }
- return nil
-}
-
-func (rcv *Message) TopicsLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
- if o != 0 {
- return rcv._tab.VectorLen(o)
- }
- return 0
-}
-
-func (rcv *Message) Payload(j int) int8 {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
- if o != 0 {
- a := rcv._tab.Vector(o)
- return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j*1))
- }
- return 0
-}
-
-func (rcv *Message) PayloadLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
- if o != 0 {
- return rcv._tab.VectorLen(o)
- }
- return 0
-}
-
-func (rcv *Message) MutatePayload(j int, n int8) bool {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
- if o != 0 {
- a := rcv._tab.Vector(o)
- return rcv._tab.MutateInt8(a+flatbuffers.UOffsetT(j*1), n)
- }
- return false
-}
-
-func MessageStart(builder *flatbuffers.Builder) {
- builder.StartObject(4)
-}
-func MessageAddCommand(builder *flatbuffers.Builder, command flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(command), 0)
-}
-func MessageAddBroker(builder *flatbuffers.Builder, broker flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(broker), 0)
-}
-func MessageAddTopics(builder *flatbuffers.Builder, topics flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(topics), 0)
-}
-func MessageStartTopicsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
- return builder.StartVector(4, numElems, 4)
-}
-func MessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0)
-}
-func MessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
- return builder.StartVector(1, numElems, 1)
-}
-func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}