summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-02 17:25:09 +0300
committerValery Piashchynski <[email protected]>2021-06-02 17:25:09 +0300
commit12c031ce76c505128ebf9daafa91952855f202d4 (patch)
tree51846c0cd8a452246e383deb2ac00cce9ef1b92c
parent352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (diff)
- Switch from the json to flatbuffers
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--go.mod1
-rw-r--r--go.sum3
-rw-r--r--pkg/pubsub/interface.go4
-rw-r--r--plugins/memory/plugin.go13
-rw-r--r--plugins/redis/fanin.go7
-rw-r--r--plugins/redis/plugin.go28
-rw-r--r--plugins/websockets/plugin.go55
-rw-r--r--plugins/websockets/pool/workers_pool.go26
-rw-r--r--plugins/websockets/rpc.go87
-rw-r--r--plugins/websockets/storage/storage.go11
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go130
11 files changed, 232 insertions, 133 deletions
diff --git a/go.mod b/go.mod
index 19a9156a..10ad1ec6 100644
--- a/go.mod
+++ b/go.mod
@@ -16,7 +16,6 @@ require (
github.com/golang/mock v1.4.4
github.com/google/flatbuffers v1.12.1
github.com/google/uuid v1.2.0
- github.com/hashicorp/go-multierror v1.1.1
github.com/json-iterator/go v1.1.11
github.com/klauspost/compress v1.12.2 // indirect
github.com/olekukonko/tablewriter v0.0.5
diff --git a/go.sum b/go.sum
index 94b71b07..eaa7b44d 100644
--- a/go.sum
+++ b/go.sum
@@ -188,14 +188,11 @@ github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBt
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
-github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
-github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
-github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index 63a12b34..18c1a80c 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -1,5 +1,7 @@
package pubsub
+import "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+
// PubSub ...
type PubSub interface {
Publisher
@@ -28,5 +30,5 @@ type Publisher interface {
// Reader interface should return next message
type Reader interface {
- Next() (*Message, error)
+ Next() (*message.Message, error)
}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index b9c5933a..eb87b39e 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -3,8 +3,9 @@ package memory
import (
"sync"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -59,18 +60,20 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
return nil
}
-func (p *Plugin) Next() (*pubsub.Message, error) {
+func (p *Plugin) Next() (*message.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
}
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+
// push only messages, which are subscribed
// TODO better???
- for i := 0; i < len(msg.Topics); i++ {
- if _, ok := p.topics.Load(msg.Topics[i]); ok {
- return msg, nil
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if _, ok := p.topics.Load(utils.AsString(fbsMsg.Topics(i))); ok {
+ return fbsMsg, nil
}
}
return nil, nil
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 6c9a5650..3082f24f 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -64,13 +64,6 @@ func (fi *FanIn) read() {
if !ok {
return
}
- //m := &pubsub.Message{}
- //err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
- //if err != nil {
- // fi.log.Error("failed to unmarshal payload", "error", err.Error())
- // continue
- //}
-
fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
case <-fi.exit:
return
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 3a21204e..5b9de5fc 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -6,7 +6,6 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -110,9 +109,7 @@ func (p *Plugin) Publish(msg []byte) error {
fbsMsg := message.GetRootAsMessage(msg, 0)
for j := 0; j < fbsMsg.TopicsLength(); j++ {
- t := fbsMsg.Table()
- vec := t.ByteVector(0)
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec)
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
if f.Err() != nil {
return f.Err()
}
@@ -122,17 +119,16 @@ func (p *Plugin) Publish(msg []byte) error {
func (p *Plugin) PublishAsync(msg []byte) {
go func() {
- //p.Lock()
- //defer p.Unlock()
- //for i := 0; i < len(msg); i++ {
- // for j := 0; j < len(msg[i].Topics); j++ {
- // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- // if f.Err() != nil {
- // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
- // continue
- // }
- // }
- //}
+ p.Lock()
+ defer p.Unlock()
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+ for j := 0; j < fbsMsg.TopicsLength(); j++ {
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error())
+ return
+ }
+ }
}()
}
@@ -145,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
}
// Next return next message
-func (p *Plugin) Next() (*pubsub.Message, error) {
+func (p *Plugin) Next() (*message.Message, error) {
return <-p.fanin.Consume(), nil
}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 16cde0cc..fe55d30e 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -227,7 +227,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.log.Error("command loop error, disconnecting", "error", err.Error())
return
}
-
p.log.Info("disconnected", "connectionID", connectionID)
})
}
@@ -291,41 +290,37 @@ func (p *Plugin) Publish(m []byte) error {
defer p.Unlock()
// Get payload
- fbsMsg := message.GetRootAsMessages(m, 0)
- tmpMsg := &message.Message{}
-
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- fbsMsg.Messages(tmpMsg, i)
-
- for j := 0; j < tmpMsg.TopicsLength(); j++ {
- if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok {
- table := tmpMsg.Table()
- err := br.Publish(table.ByteVector(0))
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker())
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
+ if err != nil {
+ return errors.E(err)
}
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []byte) {
- //go func() {
- // p.Lock()
- // defer p.Unlock()
- // for i := 0; i < len(msg); i++ {
- // for j := 0; j < len(msg[i].Topics); j++ {
- // err := p.pubsubs[msg[i].Broker].Publish(msg)
- // if err != nil {
- // p.log.Error("publish async error", "error", err)
- // return
- // }
- // }
- // }
- //}()
+func (p *Plugin) PublishAsync(m []byte) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ return
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ }
+ }
+ }()
}
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 8f18580f..7fcc873b 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,10 +4,11 @@ import (
"sync"
"github.com/fasthttp/websocket"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+ "github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
@@ -16,7 +17,7 @@ type WorkersPool struct {
resPool sync.Pool
log logger.Logger
- queue chan *pubsub.Message
+ queue chan *message.Message
exit chan struct{}
}
@@ -24,7 +25,7 @@ type WorkersPool struct {
func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *pubsub.Message, 100),
+ queue: make(chan *message.Message, 100),
storage: storage,
log: log,
exit: make(chan struct{}),
@@ -42,7 +43,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
return wp
}
-func (wp *WorkersPool) Queue(msg *pubsub.Message) {
+func (wp *WorkersPool) Queue(msg *message.Message) {
wp.queue <- msg
}
@@ -75,16 +76,18 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if !ok {
return
}
- // do not handle nil's
+ _ = msg
if msg == nil {
continue
}
- if len(msg.Topics) == 0 {
+ if msg.TopicsLength() == 0 {
continue
}
res := wp.get()
- // get connections for the particular topic
- wp.storage.GetByPtr(msg.Topics, res)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ // get connections for the particular topic
+ wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res)
+ }
if len(res) == 0 {
wp.log.Info("no such topic", "topic", msg.Topics)
wp.put(res)
@@ -99,7 +102,12 @@ func (wp *WorkersPool) do() { //nolint:gocognit
}
conn := c.(*connection.Connection)
- err := conn.Write(websocket.BinaryMessage, msg.Payload)
+ // TODO sync pool for the bytes
+ bb := make([]byte, msg.PayloadLength())
+ for i := 0; i < msg.PayloadLength(); i++ {
+ bb[i] = byte(msg.Payload(i))
+ }
+ err := conn.Write(websocket.BinaryMessage, bb)
if err != nil {
wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
wp.put(res)
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index d915aa43..6c2cacb4 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -1,7 +1,9 @@
package websockets
import (
+ flatbuffers "github.com/google/flatbuffers/go"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -11,6 +13,8 @@ type rpc struct {
log logger.Logger
}
+// Publish ... msg is a flatbuffers decoded payload
+// see: pkg/pubsub/message.fbs
func (r *rpc) Publish(msg []byte, ok *bool) error {
const op = errors.Op("broadcast_publish")
r.log.Debug("message published")
@@ -21,15 +25,35 @@ func (r *rpc) Publish(msg []byte, ok *bool) error {
return nil
}
- err := r.plugin.Publish(msg)
- if err != nil {
- *ok = false
- return errors.E(op, err)
+ fbsMsg := message.GetRootAsMessages(msg, 0)
+ tmpMsg := &message.Message{}
+
+ b := flatbuffers.NewBuilder(100)
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ // init a message
+ fbsMsg.Messages(tmpMsg, i)
+
+ // overhead HERE
+ orig := serializeMsg(b, tmpMsg)
+ bb := make([]byte, len(orig))
+ copy(bb, orig)
+
+ err := r.plugin.Publish(bb)
+ if err != nil {
+ *ok = false
+ b.Reset()
+ return errors.E(op, err)
+ }
+ b.Reset()
}
+
*ok = true
return nil
}
+// PublishAsync ...
+// see: pkg/pubsub/message.fbs
func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
r.log.Debug("message published", "msg", msg)
@@ -38,9 +62,60 @@ func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
*ok = true
return nil
}
- // publish to the registered broker
- r.plugin.PublishAsync(msg)
+
+ fbsMsg := message.GetRootAsMessages(msg, 0)
+ tmpMsg := &message.Message{}
+
+ b := flatbuffers.NewBuilder(100)
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ // init a message
+ fbsMsg.Messages(tmpMsg, i)
+
+ // overhead HERE
+ orig := serializeMsg(b, tmpMsg)
+ bb := make([]byte, len(orig))
+ copy(bb, orig)
+
+ r.plugin.PublishAsync(bb)
+ b.Reset()
+ }
*ok = true
return nil
}
+
+func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte {
+ cmdOff := b.CreateByteString(msg.Command())
+ brokerOff := b.CreateByteString(msg.Broker())
+
+ offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength())
+ for j := msg.TopicsLength() - 1; j >= 0; j-- {
+ offsets[j] = b.CreateByteString(msg.Topics(j))
+ }
+
+ message.MessageStartTopicsVector(b, len(offsets))
+
+ for j := len(offsets) - 1; j >= 0; j-- {
+ b.PrependUOffsetT(offsets[j])
+ }
+
+ tOff := b.EndVector(len(offsets))
+ bb := make([]byte, msg.PayloadLength())
+ for i := 0; i < msg.PayloadLength(); i++ {
+ bb[i] = byte(msg.Payload(i))
+ }
+ pOff := b.CreateByteVector(bb)
+
+ message.MessageStart(b)
+
+ message.MessageAddCommand(b, cmdOff)
+ message.MessageAddBroker(b, brokerOff)
+ message.MessageAddTopics(b, tOff)
+ message.MessageAddPayload(b, pOff)
+
+ fOff := message.MessageEnd(b)
+ b.Finish(fOff)
+
+ return b.FinishedBytes()
+}
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
index ac256be2..43834658 100644
--- a/plugins/websockets/storage/storage.go
+++ b/plugins/websockets/storage/storage.go
@@ -63,6 +63,17 @@ func (s *Storage) GetByPtrTS(topics []string, res map[string]struct{}) {
}
}
}
+func (s *Storage) GetOneByPtr(topic string, res map[string]struct{}) {
+ s.RLock()
+ defer s.RUnlock()
+
+ d := s.BST.Get(topic)
+ if len(d) > 0 {
+ for ii := range d {
+ res[ii] = struct{}{}
+ }
+ }
+}
func (s *Storage) GetByPtr(topics []string, res map[string]struct{}) {
s.RLock()
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index f5289752..b2c756bf 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -18,7 +18,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- message2 "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -32,7 +32,7 @@ import (
)
type Msg struct {
- // Topic message been pushed into.
+ // Topic makeMessage been pushed into.
Topics []string `json:"topic"`
// Command (join, leave, headers)
@@ -134,7 +134,7 @@ func wsInit(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -244,7 +244,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -261,14 +261,14 @@ func RPCWsMemoryPubAsync(t *testing.T) {
publishAsync("", "memory", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -291,7 +291,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
publishAsync2("", "memory", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -318,7 +318,7 @@ func RPCWsMemory(t *testing.T) {
}
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -335,14 +335,14 @@ func RPCWsMemory(t *testing.T) {
publish("", "memory", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -365,7 +365,7 @@ func RPCWsMemory(t *testing.T) {
publish2("", "memory", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -390,7 +390,7 @@ func RPCWsRedis(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(message("join", "redis", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -407,14 +407,14 @@ func RPCWsRedis(t *testing.T) {
publish("", "redis", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "redis", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -437,7 +437,7 @@ func RPCWsRedis(t *testing.T) {
publish2("", "redis", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -540,7 +540,7 @@ func RPCWsMemoryDeny(t *testing.T) {
}
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -556,7 +556,7 @@ func RPCWsMemoryDeny(t *testing.T) {
assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -761,7 +761,7 @@ func RPCWsMemoryAllow(t *testing.T) {
}
}()
- d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -778,14 +778,14 @@ func RPCWsMemoryAllow(t *testing.T) {
publish("", "memory", "foo")
- // VERIFY a message
+ // VERIFY a makeMessage
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
assert.Equal(t, "hello, PHP", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -808,7 +808,7 @@ func RPCWsMemoryAllow(t *testing.T) {
publish2("", "memory", "foo2")
}()
- // should be only message from the subscribed foo2 topic
+ // should be only makeMessage from the subscribed foo2 topic
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
@@ -827,7 +827,7 @@ func publish(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret)
+ err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret)
if err != nil {
panic(err)
}
@@ -842,7 +842,7 @@ func publishAsync(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret)
+ err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret)
if err != nil {
panic(err)
}
@@ -857,7 +857,7 @@ func publishAsync2(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret)
+ err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret)
if err != nil {
panic(err)
}
@@ -872,13 +872,12 @@ func publish2(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
- err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret)
+ err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret)
if err != nil {
panic(err)
}
}
-
-func message(command string, broker string, payload []byte, topics ...string) *Msg {
+func messageWS(command string, broker string, payload []byte, topics ...string) *Msg {
return &Msg{
Topics: topics,
Command: command,
@@ -887,49 +886,70 @@ func message(command string, broker string, payload []byte, topics ...string) *M
}
}
-func makePayload(b *flatbuffers.Builder, storage string, items []pubsub.Message) []byte {
- b.Reset()
+func makeMessage(command string, broker string, payload []byte, topics ...string) []byte {
+ m := []pubsub.Message{
+ {
+ Topics: topics,
+ Command: command,
+ Broker: broker,
+ Payload: payload,
+ },
+ }
- storageOffset := b.CreateString(storage)
+ b := flatbuffers.NewBuilder(1)
- // //////////////////// ITEMS VECTOR ////////////////////////////
- offset := make([]flatbuffers.UOffsetT, len(items))
- for i := len(items) - 1; i >= 0; i-- {
- offset[i] = serializeItems(b, items[i])
- }
+ return msgs(b, m)
+}
- message2.MessageStartTopicsVector(b, len(offset))
+func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT {
+ cmdOff := b.CreateString(msg.Command)
+ brokerOff := b.CreateString(msg.Broker)
- for i := len(offset) - 1; i >= 0; i-- {
- b.PrependUOffsetT(offset[i])
+ offsets := make([]flatbuffers.UOffsetT, len(msg.Topics))
+ for j := len(msg.Topics) - 1; j >= 0; j-- {
+ offsets[j] = b.CreateString(msg.Topics[j])
}
- itemsOffset := b.EndVector(len(offset))
- // /////////////////////////////////////////////////////////////////
+ message.MessageStartTopicsVector(b, len(offsets))
- message2.MessageStart(b)
- message2.MessagesAddMessages(b, itemsOffset)
- message2.PayloadAddStorage(b, storageOffset)
+ for j := len(offsets) - 1; j >= 0; j-- {
+ b.PrependUOffsetT(offsets[j])
+ }
- finalOffset := message2.PayloadEnd(b)
+ tOff := b.EndVector(len(offsets))
+ pOff := b.CreateByteVector(msg.Payload)
- b.Finish(finalOffset)
+ message.MessageStart(b)
- return b.Bytes[b.Head():]
+ message.MessageAddCommand(b, cmdOff)
+ message.MessageAddBroker(b, brokerOff)
+ message.MessageAddTopics(b, tOff)
+ message.MessageAddPayload(b, pOff)
+
+ return message.MessageEnd(b)
}
-func serializeItems(b *flatbuffers.Builder, item pubsub.Message) flatbuffers.UOffsetT {
- br := b.CreateString(item.Broker)
- cmd := b.CreateString(item.Command)
- payload := b.CreateByteVector(item.Payload)
+func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte {
+ b.Reset()
+ mOff := make([]flatbuffers.UOffsetT, len(msgs))
+ for i := len(msgs) - 1; i >= 0; i-- {
+ mOff[i] = serializeMsg(b, msgs[i])
+ }
- message2.MessageStart(b)
+ message.MessagesStartMessagesVector(b, len(mOff))
- message2.MessageAddBroker(b, br)
- message2.MessageAddCommand(b, cmd)
- message2.MessageAddPayload(b, payload)
+ for i := len(mOff) - 1; i >= 0; i-- {
+ b.PrependUOffsetT(mOff[i])
+ }
- return message2.MessageEnd(b)
+ msgsOff := b.EndVector(len(msgs))
+
+ message.MessagesStart(b)
+ message.MessagesAddMessages(b, msgsOff)
+ fOff := message.MessagesEnd(b)
+ b.Finish(fOff)
+
+ return b.Bytes[b.Head():]
}