summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-02 19:16:36 +0300
committerGitHub <[email protected]>2021-06-02 19:16:36 +0300
commita99c14abb333c10a9142cd2f178e001f1b1726fb (patch)
treeec46ffb3db177f9aacef75d9c7bdcd6d894bf20c
parent548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff)
parent27295b35e4f2702bf73d8ab10d10b84e527daf2b (diff)
#698 feat(ws): replace `json` with binary flatbuffers
#698 feat(ws): replace `json` with binary flatbuffers
-rw-r--r--CHANGELOG.md15
-rw-r--r--go.mod1
-rw-r--r--go.sum3
-rw-r--r--pkg/pubsub/interface.go8
-rw-r--r--pkg/pubsub/message.fbs (renamed from plugins/websockets/schema/message.fbs)6
-rw-r--r--pkg/pubsub/message.go9
-rw-r--r--pkg/pubsub/message/Message.go (renamed from plugins/websockets/schema/message/Message.go)0
-rw-r--r--pkg/pubsub/message/Messages.go67
-rw-r--r--plugins/memory/plugin.go29
-rw-r--r--plugins/redis/fanin.go18
-rw-r--r--plugins/redis/plugin.go34
-rw-r--r--plugins/websockets/plugin.go34
-rw-r--r--plugins/websockets/pool/workers_pool.go26
-rw-r--r--plugins/websockets/rpc.go94
-rw-r--r--plugins/websockets/storage/storage.go11
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go122
16 files changed, 355 insertions, 122 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5afdfdc2..94dca5f3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,21 @@
CHANGELOG
=========
+v2.3.0 (08.06.2021)
+-------------------
+
+## 👀 New:
+- Brand new `broadcast` plugins that now have the name - `websockets` with broadcast capabilities. It can handle hundreds of
+thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus on 2CPU cores and 1GB of RAM)
+
+- Flatbuffers binary messages for the `websockets` RPC calls under the hood.
+
+## 🩹 Fixes:
+
+- 🐛 Fix:
+
+---
+
v2.2.1 (13.05.2021)
-------------------
diff --git a/go.mod b/go.mod
index 19a9156a..10ad1ec6 100644
--- a/go.mod
+++ b/go.mod
@@ -16,7 +16,6 @@ require (
github.com/golang/mock v1.4.4
github.com/google/flatbuffers v1.12.1
github.com/google/uuid v1.2.0
- github.com/hashicorp/go-multierror v1.1.1
github.com/json-iterator/go v1.1.11
github.com/klauspost/compress v1.12.2 // indirect
github.com/olekukonko/tablewriter v0.0.5
diff --git a/go.sum b/go.sum
index 94b71b07..eaa7b44d 100644
--- a/go.sum
+++ b/go.sum
@@ -188,14 +188,11 @@ github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBt
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
-github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
-github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
-github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index caf8783f..18c1a80c 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -1,5 +1,7 @@
package pubsub
+import "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+
// PubSub ...
type PubSub interface {
Publisher
@@ -19,14 +21,14 @@ type Subscriber interface {
// Publisher publish one or more messages
type Publisher interface {
// Publish one or multiple Channel.
- Publish(messages []*Message) error
+ Publish(messages []byte) error
// PublishAsync publish message and return immediately
// If error occurred it will be printed into the logger
- PublishAsync(messages []*Message)
+ PublishAsync(messages []byte)
}
// Reader interface should return next message
type Reader interface {
- Next() (*Message, error)
+ Next() (*message.Message, error)
}
diff --git a/plugins/websockets/schema/message.fbs b/pkg/pubsub/message.fbs
index f2d92c78..7e975894 100644
--- a/plugins/websockets/schema/message.fbs
+++ b/pkg/pubsub/message.fbs
@@ -7,4 +7,8 @@ table Message {
payload:[byte];
}
-root_type Message;
+table Messages {
+ messages:[Message];
+}
+
+root_type Messages;
diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go
index c17d153b..74348722 100644
--- a/pkg/pubsub/message.go
+++ b/pkg/pubsub/message.go
@@ -1,9 +1,5 @@
package pubsub
-import (
- json "github.com/json-iterator/go"
-)
-
type Message struct {
// Command (join, leave, headers)
Command string `json:"command"`
@@ -17,8 +13,3 @@ type Message struct {
// Payload to be broadcasted
Payload []byte `json:"payload"`
}
-
-// MarshalBinary needed to marshal message for the redis
-func (m *Message) MarshalBinary() ([]byte, error) {
- return json.Marshal(m)
-}
diff --git a/plugins/websockets/schema/message/Message.go b/pkg/pubsub/message/Message.go
index 26bbd12c..26bbd12c 100644
--- a/plugins/websockets/schema/message/Message.go
+++ b/pkg/pubsub/message/Message.go
diff --git a/pkg/pubsub/message/Messages.go b/pkg/pubsub/message/Messages.go
new file mode 100644
index 00000000..633b367d
--- /dev/null
+++ b/pkg/pubsub/message/Messages.go
@@ -0,0 +1,67 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package message
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Messages struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Messages{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func GetSizePrefixedRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages {
+ n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
+ x := &Messages{}
+ x.Init(buf, n+offset+flatbuffers.SizeUint32)
+ return x
+}
+
+func (rcv *Messages) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Messages) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Messages) Messages(obj *Message, j int) bool {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ x := rcv._tab.Vector(o)
+ x += flatbuffers.UOffsetT(j) * 4
+ x = rcv._tab.Indirect(x)
+ obj.Init(rcv._tab.Bytes, x)
+ return true
+ }
+ return false
+}
+
+func (rcv *Messages) MessagesLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func MessagesStart(builder *flatbuffers.Builder) {
+ builder.StartObject(1)
+}
+func MessagesAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(messages), 0)
+}
+func MessagesStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func MessagesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 49c187bc..eb87b39e 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -3,8 +3,9 @@ package memory
import (
"sync"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -15,14 +16,14 @@ type Plugin struct {
log logger.Logger
// channel with the messages from the RPC
- pushCh chan *pubsub.Message
+ pushCh chan []byte
// user-subscribed topics
topics sync.Map
}
func (p *Plugin) Init(log logger.Logger) error {
p.log = log
- p.pushCh = make(chan *pubsub.Message, 100)
+ p.pushCh = make(chan []byte, 100)
return nil
}
@@ -34,18 +35,14 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) Publish(messages []*pubsub.Message) error {
- for i := 0; i < len(messages); i++ {
- p.pushCh <- messages[i]
- }
+func (p *Plugin) Publish(messages []byte) error {
+ p.pushCh <- messages
return nil
}
-func (p *Plugin) PublishAsync(messages []*pubsub.Message) {
+func (p *Plugin) PublishAsync(messages []byte) {
go func() {
- for i := 0; i < len(messages); i++ {
- p.pushCh <- messages[i]
- }
+ p.pushCh <- messages
}()
}
@@ -63,18 +60,20 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
return nil
}
-func (p *Plugin) Next() (*pubsub.Message, error) {
+func (p *Plugin) Next() (*message.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
}
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+
// push only messages, which are subscribed
// TODO better???
- for i := 0; i < len(msg.Topics); i++ {
- if _, ok := p.topics.Load(msg.Topics[i]); ok {
- return msg, nil
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if _, ok := p.topics.Load(utils.AsString(fbsMsg.Topics(i))); ok {
+ return fbsMsg, nil
}
}
return nil, nil
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 93b13124..3082f24f 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,8 +4,7 @@ import (
"context"
"sync"
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/go-redis/redis/v8"
@@ -22,13 +21,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan *pubsub.Message
+ out chan *message.Message
exit chan struct{}
}
func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *pubsub.Message, 100)
+ out := make(chan *message.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -65,14 +64,7 @@ func (fi *FanIn) read() {
if !ok {
return
}
- m := &pubsub.Message{}
- err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
- if err != nil {
- fi.log.Error("failed to unmarshal payload", "error", err.Error())
- continue
- }
-
- fi.out <- m
+ fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
case <-fi.exit:
return
}
@@ -95,6 +87,6 @@ func (fi *FanIn) Stop() error {
return nil
}
-func (fi *FanIn) Consume() <-chan *pubsub.Message {
+func (fi *FanIn) Consume() <-chan *message.Message {
return fi.out
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index c1480de8..5b9de5fc 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -6,9 +6,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const PluginName = "redis"
@@ -101,32 +102,31 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-func (p *Plugin) Publish(msg []*pubsub.Message) error {
+func (p *Plugin) Publish(msg []byte) error {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- if f.Err() != nil {
- return f.Err()
- }
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+
+ for j := 0; j < fbsMsg.TopicsLength(); j++ {
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ if f.Err() != nil {
+ return f.Err()
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
+func (p *Plugin) PublishAsync(msg []byte) {
go func() {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- if f.Err() != nil {
- p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
- continue
- }
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+ for j := 0; j < fbsMsg.TopicsLength(); j++ {
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error())
+ return
}
}
}()
@@ -141,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
}
// Next return next message
-func (p *Plugin) Next() (*pubsub.Message, error) {
+func (p *Plugin) Next() (*message.Message, error) {
return <-p.fanin.Consume(), nil
}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 9b21ff8f..fe55d30e 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -15,6 +15,7 @@ import (
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -25,6 +26,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -225,7 +227,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.log.Error("command loop error, disconnecting", "error", err.Error())
return
}
-
p.log.Info("disconnected", "connectionID", connectionID)
})
}
@@ -284,36 +285,39 @@ func (p *Plugin) Reset() error {
}
// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(msg []*pubsub.Message) error {
+func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- if br, ok := p.pubsubs[msg[i].Broker]; ok {
- err := br.Publish(msg)
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker)
+ // Get payload
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
+ if err != nil {
+ return errors.E(err)
}
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
+func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- err := p.pubsubs[msg[i].Broker].Publish(msg)
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
if err != nil {
p.log.Error("publish async error", "error", err)
return
}
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
}
}
}()
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 8f18580f..7fcc873b 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,10 +4,11 @@ import (
"sync"
"github.com/fasthttp/websocket"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+ "github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
@@ -16,7 +17,7 @@ type WorkersPool struct {
resPool sync.Pool
log logger.Logger
- queue chan *pubsub.Message
+ queue chan *message.Message
exit chan struct{}
}
@@ -24,7 +25,7 @@ type WorkersPool struct {
func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *pubsub.Message, 100),
+ queue: make(chan *message.Message, 100),
storage: storage,
log: log,
exit: make(chan struct{}),
@@ -42,7 +43,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
return wp
}
-func (wp *WorkersPool) Queue(msg *pubsub.Message) {
+func (wp *WorkersPool) Queue(msg *message.Message) {
wp.queue <- msg
}
@@ -75,16 +76,18 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if !ok {
return
}
- // do not handle nil's
+ _ = msg
if msg == nil {
continue
}
- if len(msg.Topics) == 0 {
+ if msg.TopicsLength() == 0 {
continue
}
res := wp.get()
- // get connections for the particular topic
- wp.storage.GetByPtr(msg.Topics, res)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ // get connections for the particular topic
+ wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res)
+ }
if len(res) == 0 {
wp.log.Info("no such topic", "topic", msg.Topics)
wp.put(res)
@@ -99,7 +102,12 @@ func (wp *WorkersPool) do() { //nolint:gocognit
}
conn := c.(*connection.Connection)
- err := conn.Write(websocket.BinaryMessage, msg.Payload)
+ // TODO sync pool for the bytes
+ bb := make([]byte, msg.PayloadLength())
+ for i := 0; i < msg.PayloadLength(); i++ {
+ bb[i] = byte(msg.Payload(i))
+ }
+ err := conn.Write(websocket.BinaryMessage, bb)
if err != nil {
wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
wp.put(res)
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 2fb0f1b9..6c2cacb4 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -1,8 +1,9 @@
package websockets
import (
+ flatbuffers "github.com/google/flatbuffers/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -12,9 +13,11 @@ type rpc struct {
log logger.Logger
}
-func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error {
+// Publish ... msg is a flatbuffers decoded payload
+// see: pkg/pubsub/message.fbs
+func (r *rpc) Publish(msg []byte, ok *bool) error {
const op = errors.Op("broadcast_publish")
- r.log.Debug("message published", "msg", msg)
+ r.log.Debug("message published")
// just return in case of nil message
if msg == nil {
@@ -22,16 +25,36 @@ func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error {
return nil
}
- err := r.plugin.Publish(msg)
- if err != nil {
- *ok = false
- return errors.E(op, err)
+ fbsMsg := message.GetRootAsMessages(msg, 0)
+ tmpMsg := &message.Message{}
+
+ b := flatbuffers.NewBuilder(100)
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ // init a message
+ fbsMsg.Messages(tmpMsg, i)
+
+ // overhead HERE
+ orig := serializeMsg(b, tmpMsg)
+ bb := make([]byte, len(orig))
+ copy(bb, orig)
+
+ err := r.plugin.Publish(bb)
+ if err != nil {
+ *ok = false
+ b.Reset()
+ return errors.E(op, err)
+ }
+ b.Reset()
}
+
*ok = true
return nil
}
-func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error {
+// PublishAsync ...
+// see: pkg/pubsub/message.fbs
+func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
r.log.Debug("message published", "msg", msg)
// just return in case of nil message
@@ -39,9 +62,60 @@ func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error {
*ok = true
return nil
}
- // publish to the registered broker
- r.plugin.PublishAsync(msg)
+
+ fbsMsg := message.GetRootAsMessages(msg, 0)
+ tmpMsg := &message.Message{}
+
+ b := flatbuffers.NewBuilder(100)
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ // init a message
+ fbsMsg.Messages(tmpMsg, i)
+
+ // overhead HERE
+ orig := serializeMsg(b, tmpMsg)
+ bb := make([]byte, len(orig))
+ copy(bb, orig)
+
+ r.plugin.PublishAsync(bb)
+ b.Reset()
+ }
*ok = true
return nil
}
+
+func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte {
+ cmdOff := b.CreateByteString(msg.Command())
+ brokerOff := b.CreateByteString(msg.Broker())
+
+ offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength())
+ for j := msg.TopicsLength() - 1; j >= 0; j-- {
+ offsets[j] = b.CreateByteString(msg.Topics(j))
+ }
+
+ message.MessageStartTopicsVector(b, len(offsets))
+
+ for j := len(offsets) - 1; j >= 0; j-- {
+ b.PrependUOffsetT(offsets[j])
+ }
+
+ tOff := b.EndVector(len(offsets))
+ bb := make([]byte, msg.PayloadLength())
+ for i := 0; i < msg.PayloadLength(); i++ {
+ bb[i] = byte(msg.Payload(i))
+ }
+ pOff := b.CreateByteVector(bb)
+
+ message.MessageStart(b)
+
+ message.MessageAddCommand(b, cmdOff)
+ message.MessageAddBroker(b, brokerOff)
+ message.MessageAddTopics(b, tOff)
+ message.MessageAddPayload(b, pOff)
+
+ fOff := message.MessageEnd(b)
+ b.Finish(fOff)
+
+ return b.FinishedBytes()
+}
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
index ac256be2..43834658 100644
--- a/plugins/websockets/storage/storage.go
+++ b/plugins/websockets/storage/storage.go
@@ -63,6 +63,17 @@ func (s *Storage) GetByPtrTS(topics []string, res map[string]struct{}) {
}
}
}
+func (s *Storage) GetOneByPtr(topic string, res map[string]struct{}) {
+ s.RLock()
+ defer s.RUnlock()
+
+ d := s.BST.Get(topic)
+ if len(d) > 0 {
+ for ii := range d {
+ res[ii] = struct{}{}
+ }
+ }
+}
func (s *Storage) GetByPtr(topics []string, res map[string]struct{}) {
s.RLock()
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 772b53ac..b2c756bf 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -13,9 +13,12 @@ import (
"time"
"github.com/fasthttp/websocket"
+ flatbuffers "github.com/google/flatbuffers/go"
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -29,7 +32,7 @@ import (
)
type Msg struct {
- // Topic message been pushed into.
+ // Topic makeMessage been pushed into.
Topics []string `json:"topic"`
// Command (join, leave, headers)
@@ -131,7 +134,7 @@ func wsInit(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -241,7 +244,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -258,14 +261,14 @@ func RPCWsMemoryPubAsync(t *testing.T) {
publishAsync("", "memory", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -288,7 +291,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
publishAsync2("", "memory", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -315,7 +318,7 @@ func RPCWsMemory(t *testing.T) {
}
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -332,14 +335,14 @@ func RPCWsMemory(t *testing.T) {
publish("", "memory", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -362,7 +365,7 @@ func RPCWsMemory(t *testing.T) {
publish2("", "memory", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -387,7 +390,7 @@ func RPCWsRedis(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(message("join", "redis", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -404,14 +407,14 @@ func RPCWsRedis(t *testing.T) {
publish("", "redis", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "redis", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -434,7 +437,7 @@ func RPCWsRedis(t *testing.T) {
publish2("", "redis", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -537,7 +540,7 @@ func RPCWsMemoryDeny(t *testing.T) {
}
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -553,7 +556,7 @@ func RPCWsMemoryDeny(t *testing.T) {
assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -758,7 +761,7 @@ func RPCWsMemoryAllow(t *testing.T) {
}
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -775,14 +778,14 @@ func RPCWsMemoryAllow(t *testing.T) {
publish("", "memory", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -805,7 +808,7 @@ func RPCWsMemoryAllow(t *testing.T) {
publish2("", "memory", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -824,7 +827,7 @@ func publish(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret)
+ err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret)
if err != nil {
panic(err)
}
@@ -839,7 +842,7 @@ func publishAsync(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret)
+ err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret)
if err != nil {
panic(err)
}
@@ -854,7 +857,7 @@ func publishAsync2(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret)
+ err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret)
if err != nil {
panic(err)
}
@@ -869,13 +872,12 @@ func publish2(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret)
+ err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret)
if err != nil {
panic(err)
}
}
-
-func message(command string, broker string, payload []byte, topics ...string) *Msg {
+func messageWS(command string, broker string, payload []byte, topics ...string) *Msg {
return &Msg{
Topics: topics,
Command: command,
@@ -883,3 +885,71 @@ func message(command string, broker string, payload []byte, topics ...string) *M
Payload: payload,
}
}
+
+func makeMessage(command string, broker string, payload []byte, topics ...string) []byte {
+ m := []pubsub.Message{
+ {
+ Topics: topics,
+ Command: command,
+ Broker: broker,
+ Payload: payload,
+ },
+ }
+
+ b := flatbuffers.NewBuilder(1)
+
+ return msgs(b, m)
+}
+
+func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT {
+ cmdOff := b.CreateString(msg.Command)
+ brokerOff := b.CreateString(msg.Broker)
+
+ offsets := make([]flatbuffers.UOffsetT, len(msg.Topics))
+ for j := len(msg.Topics) - 1; j >= 0; j-- {
+ offsets[j] = b.CreateString(msg.Topics[j])
+ }
+
+ message.MessageStartTopicsVector(b, len(offsets))
+
+ for j := len(offsets) - 1; j >= 0; j-- {
+ b.PrependUOffsetT(offsets[j])
+ }
+
+ tOff := b.EndVector(len(offsets))
+ pOff := b.CreateByteVector(msg.Payload)
+
+ message.MessageStart(b)
+
+ message.MessageAddCommand(b, cmdOff)
+ message.MessageAddBroker(b, brokerOff)
+ message.MessageAddTopics(b, tOff)
+ message.MessageAddPayload(b, pOff)
+
+ return message.MessageEnd(b)
+}
+
+func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte {
+ b.Reset()
+
+ mOff := make([]flatbuffers.UOffsetT, len(msgs))
+
+ for i := len(msgs) - 1; i >= 0; i-- {
+ mOff[i] = serializeMsg(b, msgs[i])
+ }
+
+ message.MessagesStartMessagesVector(b, len(mOff))
+
+ for i := len(mOff) - 1; i >= 0; i-- {
+ b.PrependUOffsetT(mOff[i])
+ }
+
+ msgsOff := b.EndVector(len(msgs))
+
+ message.MessagesStart(b)
+ message.MessagesAddMessages(b, msgsOff)
+ fOff := message.MessagesEnd(b)
+ b.Finish(fOff)
+
+ return b.Bytes[b.Head():]
+}