diff options
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/plugin.go | 49 | ||||
-rw-r--r-- | plugins/websockets/rpc.go | 7 | ||||
-rw-r--r-- | plugins/websockets/schema/message.fbs | 10 | ||||
-rw-r--r-- | plugins/websockets/schema/message/Message.go | 118 |
4 files changed, 32 insertions, 152 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 9b21ff8f..16cde0cc 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -15,6 +15,7 @@ import ( phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -25,6 +26,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -284,39 +286,46 @@ func (p *Plugin) Reset() error { } // Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(msg []*pubsub.Message) error { +func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - if br, ok := p.pubsubs[msg[i].Broker]; ok { - err := br.Publish(msg) + // Get payload + fbsMsg := message.GetRootAsMessages(m, 0) + tmpMsg := &message.Message{} + + for i := 0; i < fbsMsg.MessagesLength(); i++ { + fbsMsg.Messages(tmpMsg, i) + + for j := 0; j < tmpMsg.TopicsLength(); j++ { + if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok { + table := tmpMsg.Table() + err := br.Publish(table.ByteVector(0)) if err != nil { return errors.E(err) } } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker) + p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker()) } } } return nil } -func (p *Plugin) PublishAsync(msg []*pubsub.Message) { - go func() { - p.Lock() - defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - err := p.pubsubs[msg[i].Broker].Publish(msg) - if err != nil { - p.log.Error("publish async error", "error", err) - return - } - } - } - }() +func (p *Plugin) PublishAsync(msg []byte) { + //go func() { + // p.Lock() + // defer p.Unlock() + // for i := 0; i < len(msg); i++ { + // for j := 0; j < len(msg[i].Topics); j++ { + // err := p.pubsubs[msg[i].Broker].Publish(msg) + // if err != nil { + // p.log.Error("publish async error", "error", err) + // return + // } + // } + // } + //}() } func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index 2fb0f1b9..d915aa43 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -2,7 +2,6 @@ package websockets import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -12,9 +11,9 @@ type rpc struct { log logger.Logger } -func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error { +func (r *rpc) Publish(msg []byte, ok *bool) error { const op = errors.Op("broadcast_publish") - r.log.Debug("message published", "msg", msg) + r.log.Debug("message published") // just return in case of nil message if msg == nil { @@ -31,7 +30,7 @@ func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error { return nil } -func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error { +func (r *rpc) PublishAsync(msg []byte, ok *bool) error { r.log.Debug("message published", "msg", msg) // just return in case of nil message diff --git a/plugins/websockets/schema/message.fbs b/plugins/websockets/schema/message.fbs deleted file mode 100644 index f2d92c78..00000000 --- a/plugins/websockets/schema/message.fbs +++ /dev/null @@ -1,10 +0,0 @@ -namespace message; - -table Message { - command:string; - broker:string; - topics:[string]; - payload:[byte]; -} - -root_type Message; diff --git a/plugins/websockets/schema/message/Message.go b/plugins/websockets/schema/message/Message.go deleted file mode 100644 index 26bbd12c..00000000 --- a/plugins/websockets/schema/message/Message.go +++ /dev/null @@ -1,118 +0,0 @@ -// Code generated by the FlatBuffers compiler. DO NOT EDIT. - -package message - -import ( - flatbuffers "github.com/google/flatbuffers/go" -) - -type Message struct { - _tab flatbuffers.Table -} - -func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &Message{} - x.Init(buf, n+offset) - return x -} - -func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { - n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) - x := &Message{} - x.Init(buf, n+offset+flatbuffers.SizeUint32) - return x -} - -func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *Message) Table() flatbuffers.Table { - return rcv._tab -} - -func (rcv *Message) Command() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Message) Broker() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Message) Topics(j int) []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) - if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4)) - } - return nil -} - -func (rcv *Message) TopicsLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func (rcv *Message) Payload(j int) int8 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) - if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j*1)) - } - return 0 -} - -func (rcv *Message) PayloadLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func (rcv *Message) MutatePayload(j int, n int8) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) - if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.MutateInt8(a+flatbuffers.UOffsetT(j*1), n) - } - return false -} - -func MessageStart(builder *flatbuffers.Builder) { - builder.StartObject(4) -} -func MessageAddCommand(builder *flatbuffers.Builder, command flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(command), 0) -} -func MessageAddBroker(builder *flatbuffers.Builder, broker flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(broker), 0) -} -func MessageAddTopics(builder *flatbuffers.Builder, topics flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(topics), 0) -} -func MessageStartTopicsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { - return builder.StartVector(4, numElems, 4) -} -func MessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0) -} -func MessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { - return builder.StartVector(1, numElems, 1) -} -func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { - return builder.EndObject() -} |