summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-02 17:25:09 +0300
committerValery Piashchynski <[email protected]>2021-06-02 17:25:09 +0300
commit12c031ce76c505128ebf9daafa91952855f202d4 (patch)
tree51846c0cd8a452246e383deb2ac00cce9ef1b92c /plugins/websockets
parent352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (diff)
- Switch from the json to flatbuffers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/plugin.go55
-rw-r--r--plugins/websockets/pool/workers_pool.go26
-rw-r--r--plugins/websockets/rpc.go87
-rw-r--r--plugins/websockets/storage/storage.go11
4 files changed, 134 insertions, 45 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 16cde0cc..fe55d30e 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -227,7 +227,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.log.Error("command loop error, disconnecting", "error", err.Error())
return
}
-
p.log.Info("disconnected", "connectionID", connectionID)
})
}
@@ -291,41 +290,37 @@ func (p *Plugin) Publish(m []byte) error {
defer p.Unlock()
// Get payload
- fbsMsg := message.GetRootAsMessages(m, 0)
- tmpMsg := &message.Message{}
-
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- fbsMsg.Messages(tmpMsg, i)
-
- for j := 0; j < tmpMsg.TopicsLength(); j++ {
- if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok {
- table := tmpMsg.Table()
- err := br.Publish(table.ByteVector(0))
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker())
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
+ if err != nil {
+ return errors.E(err)
}
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []byte) {
- //go func() {
- // p.Lock()
- // defer p.Unlock()
- // for i := 0; i < len(msg); i++ {
- // for j := 0; j < len(msg[i].Topics); j++ {
- // err := p.pubsubs[msg[i].Broker].Publish(msg)
- // if err != nil {
- // p.log.Error("publish async error", "error", err)
- // return
- // }
- // }
- // }
- //}()
+func (p *Plugin) PublishAsync(m []byte) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ return
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ }
+ }
+ }()
}
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 8f18580f..7fcc873b 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,10 +4,11 @@ import (
"sync"
"github.com/fasthttp/websocket"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+ "github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
@@ -16,7 +17,7 @@ type WorkersPool struct {
resPool sync.Pool
log logger.Logger
- queue chan *pubsub.Message
+ queue chan *message.Message
exit chan struct{}
}
@@ -24,7 +25,7 @@ type WorkersPool struct {
func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *pubsub.Message, 100),
+ queue: make(chan *message.Message, 100),
storage: storage,
log: log,
exit: make(chan struct{}),
@@ -42,7 +43,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
return wp
}
-func (wp *WorkersPool) Queue(msg *pubsub.Message) {
+func (wp *WorkersPool) Queue(msg *message.Message) {
wp.queue <- msg
}
@@ -75,16 +76,18 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if !ok {
return
}
- // do not handle nil's
+ _ = msg
if msg == nil {
continue
}
- if len(msg.Topics) == 0 {
+ if msg.TopicsLength() == 0 {
continue
}
res := wp.get()
- // get connections for the particular topic
- wp.storage.GetByPtr(msg.Topics, res)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ // get connections for the particular topic
+ wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res)
+ }
if len(res) == 0 {
wp.log.Info("no such topic", "topic", msg.Topics)
wp.put(res)
@@ -99,7 +102,12 @@ func (wp *WorkersPool) do() { //nolint:gocognit
}
conn := c.(*connection.Connection)
- err := conn.Write(websocket.BinaryMessage, msg.Payload)
+ // TODO sync pool for the bytes
+ bb := make([]byte, msg.PayloadLength())
+ for i := 0; i < msg.PayloadLength(); i++ {
+ bb[i] = byte(msg.Payload(i))
+ }
+ err := conn.Write(websocket.BinaryMessage, bb)
if err != nil {
wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
wp.put(res)
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index d915aa43..6c2cacb4 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -1,7 +1,9 @@
package websockets
import (
+ flatbuffers "github.com/google/flatbuffers/go"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -11,6 +13,8 @@ type rpc struct {
log logger.Logger
}
+// Publish ... msg is a flatbuffers decoded payload
+// see: pkg/pubsub/message.fbs
func (r *rpc) Publish(msg []byte, ok *bool) error {
const op = errors.Op("broadcast_publish")
r.log.Debug("message published")
@@ -21,15 +25,35 @@ func (r *rpc) Publish(msg []byte, ok *bool) error {
return nil
}
- err := r.plugin.Publish(msg)
- if err != nil {
- *ok = false
- return errors.E(op, err)
+ fbsMsg := message.GetRootAsMessages(msg, 0)
+ tmpMsg := &message.Message{}
+
+ b := flatbuffers.NewBuilder(100)
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ // init a message
+ fbsMsg.Messages(tmpMsg, i)
+
+ // overhead HERE
+ orig := serializeMsg(b, tmpMsg)
+ bb := make([]byte, len(orig))
+ copy(bb, orig)
+
+ err := r.plugin.Publish(bb)
+ if err != nil {
+ *ok = false
+ b.Reset()
+ return errors.E(op, err)
+ }
+ b.Reset()
}
+
*ok = true
return nil
}
+// PublishAsync ...
+// see: pkg/pubsub/message.fbs
func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
r.log.Debug("message published", "msg", msg)
@@ -38,9 +62,60 @@ func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
*ok = true
return nil
}
- // publish to the registered broker
- r.plugin.PublishAsync(msg)
+
+ fbsMsg := message.GetRootAsMessages(msg, 0)
+ tmpMsg := &message.Message{}
+
+ b := flatbuffers.NewBuilder(100)
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ // init a message
+ fbsMsg.Messages(tmpMsg, i)
+
+ // overhead HERE
+ orig := serializeMsg(b, tmpMsg)
+ bb := make([]byte, len(orig))
+ copy(bb, orig)
+
+ r.plugin.PublishAsync(bb)
+ b.Reset()
+ }
*ok = true
return nil
}
+
+func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte {
+ cmdOff := b.CreateByteString(msg.Command())
+ brokerOff := b.CreateByteString(msg.Broker())
+
+ offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength())
+ for j := msg.TopicsLength() - 1; j >= 0; j-- {
+ offsets[j] = b.CreateByteString(msg.Topics(j))
+ }
+
+ message.MessageStartTopicsVector(b, len(offsets))
+
+ for j := len(offsets) - 1; j >= 0; j-- {
+ b.PrependUOffsetT(offsets[j])
+ }
+
+ tOff := b.EndVector(len(offsets))
+ bb := make([]byte, msg.PayloadLength())
+ for i := 0; i < msg.PayloadLength(); i++ {
+ bb[i] = byte(msg.Payload(i))
+ }
+ pOff := b.CreateByteVector(bb)
+
+ message.MessageStart(b)
+
+ message.MessageAddCommand(b, cmdOff)
+ message.MessageAddBroker(b, brokerOff)
+ message.MessageAddTopics(b, tOff)
+ message.MessageAddPayload(b, pOff)
+
+ fOff := message.MessageEnd(b)
+ b.Finish(fOff)
+
+ return b.FinishedBytes()
+}
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
index ac256be2..43834658 100644
--- a/plugins/websockets/storage/storage.go
+++ b/plugins/websockets/storage/storage.go
@@ -63,6 +63,17 @@ func (s *Storage) GetByPtrTS(topics []string, res map[string]struct{}) {
}
}
}
+func (s *Storage) GetOneByPtr(topic string, res map[string]struct{}) {
+ s.RLock()
+ defer s.RUnlock()
+
+ d := s.BST.Get(topic)
+ if len(d) > 0 {
+ for ii := range d {
+ res[ii] = struct{}{}
+ }
+ }
+}
func (s *Storage) GetByPtr(topics []string, res map[string]struct{}) {
s.RLock()