diff options
author | Valery Piashchynski <[email protected]> | 2021-06-02 19:16:36 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-02 19:16:36 +0300 |
commit | a99c14abb333c10a9142cd2f178e001f1b1726fb (patch) | |
tree | ec46ffb3db177f9aacef75d9c7bdcd6d894bf20c | |
parent | 548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff) | |
parent | 27295b35e4f2702bf73d8ab10d10b84e527daf2b (diff) |
#698 feat(ws): replace `json` with binary flatbuffers
#698 feat(ws): replace `json` with binary flatbuffers
-rw-r--r-- | CHANGELOG.md | 15 | ||||
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 3 | ||||
-rw-r--r-- | pkg/pubsub/interface.go | 8 | ||||
-rw-r--r-- | pkg/pubsub/message.fbs (renamed from plugins/websockets/schema/message.fbs) | 6 | ||||
-rw-r--r-- | pkg/pubsub/message.go | 9 | ||||
-rw-r--r-- | pkg/pubsub/message/Message.go (renamed from plugins/websockets/schema/message/Message.go) | 0 | ||||
-rw-r--r-- | pkg/pubsub/message/Messages.go | 67 | ||||
-rw-r--r-- | plugins/memory/plugin.go | 29 | ||||
-rw-r--r-- | plugins/redis/fanin.go | 18 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 34 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 34 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 26 | ||||
-rw-r--r-- | plugins/websockets/rpc.go | 94 | ||||
-rw-r--r-- | plugins/websockets/storage/storage.go | 11 | ||||
-rw-r--r-- | tests/plugins/websockets/websocket_plugin_test.go | 122 |
16 files changed, 355 insertions, 122 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 5afdfdc2..94dca5f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,21 @@ CHANGELOG ========= +v2.3.0 (08.06.2021) +------------------- + +## 👀 New: +- Brand new `broadcast` plugins that now have the name - `websockets` with broadcast capabilities. It can handle hundreds of +thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus on 2CPU cores and 1GB of RAM) + +- Flatbuffers binary messages for the `websockets` RPC calls under the hood. + +## 🩹 Fixes: + +- 🐛 Fix: + +--- + v2.2.1 (13.05.2021) ------------------- @@ -16,7 +16,6 @@ require ( github.com/golang/mock v1.4.4 github.com/google/flatbuffers v1.12.1 github.com/google/uuid v1.2.0 - github.com/hashicorp/go-multierror v1.1.1 github.com/json-iterator/go v1.1.11 github.com/klauspost/compress v1.12.2 // indirect github.com/olekukonko/tablewriter v0.0.5 @@ -188,14 +188,11 @@ github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBt github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= -github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= -github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= -github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go index caf8783f..18c1a80c 100644 --- a/pkg/pubsub/interface.go +++ b/pkg/pubsub/interface.go @@ -1,5 +1,7 @@ package pubsub +import "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + // PubSub ... type PubSub interface { Publisher @@ -19,14 +21,14 @@ type Subscriber interface { // Publisher publish one or more messages type Publisher interface { // Publish one or multiple Channel. - Publish(messages []*Message) error + Publish(messages []byte) error // PublishAsync publish message and return immediately // If error occurred it will be printed into the logger - PublishAsync(messages []*Message) + PublishAsync(messages []byte) } // Reader interface should return next message type Reader interface { - Next() (*Message, error) + Next() (*message.Message, error) } diff --git a/plugins/websockets/schema/message.fbs b/pkg/pubsub/message.fbs index f2d92c78..7e975894 100644 --- a/plugins/websockets/schema/message.fbs +++ b/pkg/pubsub/message.fbs @@ -7,4 +7,8 @@ table Message { payload:[byte]; } -root_type Message; +table Messages { + messages:[Message]; +} + +root_type Messages; diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go index c17d153b..74348722 100644 --- a/pkg/pubsub/message.go +++ b/pkg/pubsub/message.go @@ -1,9 +1,5 @@ package pubsub -import ( - json "github.com/json-iterator/go" -) - type Message struct { // Command (join, leave, headers) Command string `json:"command"` @@ -17,8 +13,3 @@ type Message struct { // Payload to be broadcasted Payload []byte `json:"payload"` } - -// MarshalBinary needed to marshal message for the redis -func (m *Message) MarshalBinary() ([]byte, error) { - return json.Marshal(m) -} diff --git a/plugins/websockets/schema/message/Message.go b/pkg/pubsub/message/Message.go index 26bbd12c..26bbd12c 100644 --- a/plugins/websockets/schema/message/Message.go +++ b/pkg/pubsub/message/Message.go diff --git a/pkg/pubsub/message/Messages.go b/pkg/pubsub/message/Messages.go new file mode 100644 index 00000000..633b367d --- /dev/null +++ b/pkg/pubsub/message/Messages.go @@ -0,0 +1,67 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Messages struct { + _tab flatbuffers.Table +} + +func GetRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Messages{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Messages{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *Messages) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Messages) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Messages) Messages(obj *Message, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Messages) MessagesLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func MessagesStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func MessagesAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(messages), 0) +} +func MessagesStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MessagesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 49c187bc..eb87b39e 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -3,8 +3,9 @@ package memory import ( "sync" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -15,14 +16,14 @@ type Plugin struct { log logger.Logger // channel with the messages from the RPC - pushCh chan *pubsub.Message + pushCh chan []byte // user-subscribed topics topics sync.Map } func (p *Plugin) Init(log logger.Logger) error { p.log = log - p.pushCh = make(chan *pubsub.Message, 100) + p.pushCh = make(chan []byte, 100) return nil } @@ -34,18 +35,14 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Publish(messages []*pubsub.Message) error { - for i := 0; i < len(messages); i++ { - p.pushCh <- messages[i] - } +func (p *Plugin) Publish(messages []byte) error { + p.pushCh <- messages return nil } -func (p *Plugin) PublishAsync(messages []*pubsub.Message) { +func (p *Plugin) PublishAsync(messages []byte) { go func() { - for i := 0; i < len(messages); i++ { - p.pushCh <- messages[i] - } + p.pushCh <- messages }() } @@ -63,18 +60,20 @@ func (p *Plugin) Unsubscribe(topics ...string) error { return nil } -func (p *Plugin) Next() (*pubsub.Message, error) { +func (p *Plugin) Next() (*message.Message, error) { msg := <-p.pushCh if msg == nil { return nil, nil } + fbsMsg := message.GetRootAsMessage(msg, 0) + // push only messages, which are subscribed // TODO better??? - for i := 0; i < len(msg.Topics); i++ { - if _, ok := p.topics.Load(msg.Topics[i]); ok { - return msg, nil + for i := 0; i < fbsMsg.TopicsLength(); i++ { + if _, ok := p.topics.Load(utils.AsString(fbsMsg.Topics(i))); ok { + return fbsMsg, nil } } return nil, nil diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 93b13124..3082f24f 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -4,8 +4,7 @@ import ( "context" "sync" - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/go-redis/redis/v8" @@ -22,13 +21,13 @@ type FanIn struct { log logger.Logger // out channel with all subs - out chan *pubsub.Message + out chan *message.Message exit chan struct{} } func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *pubsub.Message, 100) + out := make(chan *message.Message, 100) fi := &FanIn{ out: out, client: redisClient, @@ -65,14 +64,7 @@ func (fi *FanIn) read() { if !ok { return } - m := &pubsub.Message{} - err := json.Unmarshal(utils.AsBytes(msg.Payload), m) - if err != nil { - fi.log.Error("failed to unmarshal payload", "error", err.Error()) - continue - } - - fi.out <- m + fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0) case <-fi.exit: return } @@ -95,6 +87,6 @@ func (fi *FanIn) Stop() error { return nil } -func (fi *FanIn) Consume() <-chan *pubsub.Message { +func (fi *FanIn) Consume() <-chan *message.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index c1480de8..5b9de5fc 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,9 +6,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) const PluginName = "redis" @@ -101,32 +102,31 @@ func (p *Plugin) Name() string { // Available interface implementation func (p *Plugin) Available() {} -func (p *Plugin) Publish(msg []*pubsub.Message) error { +func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - if f.Err() != nil { - return f.Err() - } + fbsMsg := message.GetRootAsMessage(msg, 0) + + for j := 0; j < fbsMsg.TopicsLength(); j++ { + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + if f.Err() != nil { + return f.Err() } } return nil } -func (p *Plugin) PublishAsync(msg []*pubsub.Message) { +func (p *Plugin) PublishAsync(msg []byte) { go func() { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - if f.Err() != nil { - p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) - continue - } + fbsMsg := message.GetRootAsMessage(msg, 0) + for j := 0; j < fbsMsg.TopicsLength(); j++ { + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + if f.Err() != nil { + p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error()) + return } } }() @@ -141,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error { } // Next return next message -func (p *Plugin) Next() (*pubsub.Message, error) { +func (p *Plugin) Next() (*message.Message, error) { return <-p.fanin.Consume(), nil } diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 9b21ff8f..fe55d30e 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -15,6 +15,7 @@ import ( phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -25,6 +26,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -225,7 +227,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { p.log.Error("command loop error, disconnecting", "error", err.Error()) return } - p.log.Info("disconnected", "connectionID", connectionID) }) } @@ -284,36 +285,39 @@ func (p *Plugin) Reset() error { } // Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(msg []*pubsub.Message) error { +func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - if br, ok := p.pubsubs[msg[i].Broker]; ok { - err := br.Publish(msg) - if err != nil { - return errors.E(err) - } - } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker) + // Get payload + fbsMsg := message.GetRootAsMessage(m, 0) + for i := 0; i < fbsMsg.TopicsLength(); i++ { + if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { + err := br.Publish(fbsMsg.Table().Bytes) + if err != nil { + return errors.E(err) } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) } } return nil } -func (p *Plugin) PublishAsync(msg []*pubsub.Message) { +func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - err := p.pubsubs[msg[i].Broker].Publish(msg) + fbsMsg := message.GetRootAsMessage(m, 0) + for i := 0; i < fbsMsg.TopicsLength(); i++ { + if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { + err := br.Publish(fbsMsg.Table().Bytes) if err != nil { p.log.Error("publish async error", "error", err) return } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) } } }() diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 8f18580f..7fcc873b 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,10 +4,11 @@ import ( "sync" "github.com/fasthttp/websocket" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/storage" + "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { @@ -16,7 +17,7 @@ type WorkersPool struct { resPool sync.Pool log logger.Logger - queue chan *pubsub.Message + queue chan *message.Message exit chan struct{} } @@ -24,7 +25,7 @@ type WorkersPool struct { func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *pubsub.Message, 100), + queue: make(chan *message.Message, 100), storage: storage, log: log, exit: make(chan struct{}), @@ -42,7 +43,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger. return wp } -func (wp *WorkersPool) Queue(msg *pubsub.Message) { +func (wp *WorkersPool) Queue(msg *message.Message) { wp.queue <- msg } @@ -75,16 +76,18 @@ func (wp *WorkersPool) do() { //nolint:gocognit if !ok { return } - // do not handle nil's + _ = msg if msg == nil { continue } - if len(msg.Topics) == 0 { + if msg.TopicsLength() == 0 { continue } res := wp.get() - // get connections for the particular topic - wp.storage.GetByPtr(msg.Topics, res) + for i := 0; i < msg.TopicsLength(); i++ { + // get connections for the particular topic + wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res) + } if len(res) == 0 { wp.log.Info("no such topic", "topic", msg.Topics) wp.put(res) @@ -99,7 +102,12 @@ func (wp *WorkersPool) do() { //nolint:gocognit } conn := c.(*connection.Connection) - err := conn.Write(websocket.BinaryMessage, msg.Payload) + // TODO sync pool for the bytes + bb := make([]byte, msg.PayloadLength()) + for i := 0; i < msg.PayloadLength(); i++ { + bb[i] = byte(msg.Payload(i)) + } + err := conn.Write(websocket.BinaryMessage, bb) if err != nil { wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics) wp.put(res) diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index 2fb0f1b9..6c2cacb4 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -1,8 +1,9 @@ package websockets import ( + flatbuffers "github.com/google/flatbuffers/go" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -12,9 +13,11 @@ type rpc struct { log logger.Logger } -func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error { +// Publish ... msg is a flatbuffers decoded payload +// see: pkg/pubsub/message.fbs +func (r *rpc) Publish(msg []byte, ok *bool) error { const op = errors.Op("broadcast_publish") - r.log.Debug("message published", "msg", msg) + r.log.Debug("message published") // just return in case of nil message if msg == nil { @@ -22,16 +25,36 @@ func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error { return nil } - err := r.plugin.Publish(msg) - if err != nil { - *ok = false - return errors.E(op, err) + fbsMsg := message.GetRootAsMessages(msg, 0) + tmpMsg := &message.Message{} + + b := flatbuffers.NewBuilder(100) + + for i := 0; i < fbsMsg.MessagesLength(); i++ { + // init a message + fbsMsg.Messages(tmpMsg, i) + + // overhead HERE + orig := serializeMsg(b, tmpMsg) + bb := make([]byte, len(orig)) + copy(bb, orig) + + err := r.plugin.Publish(bb) + if err != nil { + *ok = false + b.Reset() + return errors.E(op, err) + } + b.Reset() } + *ok = true return nil } -func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error { +// PublishAsync ... +// see: pkg/pubsub/message.fbs +func (r *rpc) PublishAsync(msg []byte, ok *bool) error { r.log.Debug("message published", "msg", msg) // just return in case of nil message @@ -39,9 +62,60 @@ func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error { *ok = true return nil } - // publish to the registered broker - r.plugin.PublishAsync(msg) + + fbsMsg := message.GetRootAsMessages(msg, 0) + tmpMsg := &message.Message{} + + b := flatbuffers.NewBuilder(100) + + for i := 0; i < fbsMsg.MessagesLength(); i++ { + // init a message + fbsMsg.Messages(tmpMsg, i) + + // overhead HERE + orig := serializeMsg(b, tmpMsg) + bb := make([]byte, len(orig)) + copy(bb, orig) + + r.plugin.PublishAsync(bb) + b.Reset() + } *ok = true return nil } + +func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte { + cmdOff := b.CreateByteString(msg.Command()) + brokerOff := b.CreateByteString(msg.Broker()) + + offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength()) + for j := msg.TopicsLength() - 1; j >= 0; j-- { + offsets[j] = b.CreateByteString(msg.Topics(j)) + } + + message.MessageStartTopicsVector(b, len(offsets)) + + for j := len(offsets) - 1; j >= 0; j-- { + b.PrependUOffsetT(offsets[j]) + } + + tOff := b.EndVector(len(offsets)) + bb := make([]byte, msg.PayloadLength()) + for i := 0; i < msg.PayloadLength(); i++ { + bb[i] = byte(msg.Payload(i)) + } + pOff := b.CreateByteVector(bb) + + message.MessageStart(b) + + message.MessageAddCommand(b, cmdOff) + message.MessageAddBroker(b, brokerOff) + message.MessageAddTopics(b, tOff) + message.MessageAddPayload(b, pOff) + + fOff := message.MessageEnd(b) + b.Finish(fOff) + + return b.FinishedBytes() +} diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go index ac256be2..43834658 100644 --- a/plugins/websockets/storage/storage.go +++ b/plugins/websockets/storage/storage.go @@ -63,6 +63,17 @@ func (s *Storage) GetByPtrTS(topics []string, res map[string]struct{}) { } } } +func (s *Storage) GetOneByPtr(topic string, res map[string]struct{}) { + s.RLock() + defer s.RUnlock() + + d := s.BST.Get(topic) + if len(d) > 0 { + for ii := range d { + res[ii] = struct{}{} + } + } +} func (s *Storage) GetByPtr(topics []string, res map[string]struct{}) { s.RLock() diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 772b53ac..b2c756bf 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -13,9 +13,12 @@ import ( "time" "github.com/fasthttp/websocket" + flatbuffers "github.com/google/flatbuffers/go" json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -29,7 +32,7 @@ import ( ) type Msg struct { - // Topic message been pushed into. + // Topic makeMessage been pushed into. Topics []string `json:"topic"` // Command (join, leave, headers) @@ -131,7 +134,7 @@ func wsInit(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -241,7 +244,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -258,14 +261,14 @@ func RPCWsMemoryPubAsync(t *testing.T) { publishAsync("", "memory", "foo") - // VERIFY a message + // VERIFY a makeMessage _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) assert.Equal(t, "hello, PHP", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -288,7 +291,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { publishAsync2("", "memory", "foo2") }() - // should be only message from the subscribed foo2 topic + // should be only makeMessage from the subscribed foo2 topic _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) @@ -315,7 +318,7 @@ func RPCWsMemory(t *testing.T) { } }() - d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -332,14 +335,14 @@ func RPCWsMemory(t *testing.T) { publish("", "memory", "foo") - // VERIFY a message + // VERIFY a makeMessage _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) assert.Equal(t, "hello, PHP", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -362,7 +365,7 @@ func RPCWsMemory(t *testing.T) { publish2("", "memory", "foo2") }() - // should be only message from the subscribed foo2 topic + // should be only makeMessage from the subscribed foo2 topic _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) @@ -387,7 +390,7 @@ func RPCWsRedis(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(message("join", "redis", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -404,14 +407,14 @@ func RPCWsRedis(t *testing.T) { publish("", "redis", "foo") - // VERIFY a message + // VERIFY a makeMessage _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) assert.Equal(t, "hello, PHP", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(message("leave", "redis", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -434,7 +437,7 @@ func RPCWsRedis(t *testing.T) { publish2("", "redis", "foo2") }() - // should be only message from the subscribed foo2 topic + // should be only makeMessage from the subscribed foo2 topic _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) @@ -537,7 +540,7 @@ func RPCWsMemoryDeny(t *testing.T) { } }() - d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -553,7 +556,7 @@ func RPCWsMemoryDeny(t *testing.T) { assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -758,7 +761,7 @@ func RPCWsMemoryAllow(t *testing.T) { } }() - d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -775,14 +778,14 @@ func RPCWsMemoryAllow(t *testing.T) { publish("", "memory", "foo") - // VERIFY a message + // VERIFY a makeMessage _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) assert.Equal(t, "hello, PHP", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -805,7 +808,7 @@ func RPCWsMemoryAllow(t *testing.T) { publish2("", "memory", "foo2") }() - // should be only message from the subscribed foo2 topic + // should be only makeMessage from the subscribed foo2 topic _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) @@ -824,7 +827,7 @@ func publish(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool - err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret) + err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret) if err != nil { panic(err) } @@ -839,7 +842,7 @@ func publishAsync(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool - err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret) + err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret) if err != nil { panic(err) } @@ -854,7 +857,7 @@ func publishAsync2(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool - err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret) + err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret) if err != nil { panic(err) } @@ -869,13 +872,12 @@ func publish2(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool - err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret) + err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret) if err != nil { panic(err) } } - -func message(command string, broker string, payload []byte, topics ...string) *Msg { +func messageWS(command string, broker string, payload []byte, topics ...string) *Msg { return &Msg{ Topics: topics, Command: command, @@ -883,3 +885,71 @@ func message(command string, broker string, payload []byte, topics ...string) *M Payload: payload, } } + +func makeMessage(command string, broker string, payload []byte, topics ...string) []byte { + m := []pubsub.Message{ + { + Topics: topics, + Command: command, + Broker: broker, + Payload: payload, + }, + } + + b := flatbuffers.NewBuilder(1) + + return msgs(b, m) +} + +func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT { + cmdOff := b.CreateString(msg.Command) + brokerOff := b.CreateString(msg.Broker) + + offsets := make([]flatbuffers.UOffsetT, len(msg.Topics)) + for j := len(msg.Topics) - 1; j >= 0; j-- { + offsets[j] = b.CreateString(msg.Topics[j]) + } + + message.MessageStartTopicsVector(b, len(offsets)) + + for j := len(offsets) - 1; j >= 0; j-- { + b.PrependUOffsetT(offsets[j]) + } + + tOff := b.EndVector(len(offsets)) + pOff := b.CreateByteVector(msg.Payload) + + message.MessageStart(b) + + message.MessageAddCommand(b, cmdOff) + message.MessageAddBroker(b, brokerOff) + message.MessageAddTopics(b, tOff) + message.MessageAddPayload(b, pOff) + + return message.MessageEnd(b) +} + +func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte { + b.Reset() + + mOff := make([]flatbuffers.UOffsetT, len(msgs)) + + for i := len(msgs) - 1; i >= 0; i-- { + mOff[i] = serializeMsg(b, msgs[i]) + } + + message.MessagesStartMessagesVector(b, len(mOff)) + + for i := len(mOff) - 1; i >= 0; i-- { + b.PrependUOffsetT(mOff[i]) + } + + msgsOff := b.EndVector(len(msgs)) + + message.MessagesStart(b) + message.MessagesAddMessages(b, msgsOff) + fOff := message.MessagesEnd(b) + b.Finish(fOff) + + return b.Bytes[b.Head():] +} |