summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-01 14:57:33 +0300
committerValery Piashchynski <[email protected]>2021-06-01 14:57:33 +0300
commit352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (patch)
treed940de0ee304d3edb60daa35568c3f186dc6a8b5
parent548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff)
- Initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/pubsub/interface.go4
-rw-r--r--pkg/pubsub/message.fbs (renamed from plugins/websockets/schema/message.fbs)6
-rw-r--r--pkg/pubsub/message/Message.go (renamed from plugins/websockets/schema/message/Message.go)0
-rw-r--r--pkg/pubsub/message/Messages.go67
-rw-r--r--plugins/memory/plugin.go16
-rw-r--r--plugins/redis/fanin.go25
-rw-r--r--plugins/redis/plugin.go42
-rw-r--r--plugins/websockets/plugin.go49
-rw-r--r--plugins/websockets/rpc.go7
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go50
10 files changed, 197 insertions, 69 deletions
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index caf8783f..63a12b34 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -19,11 +19,11 @@ type Subscriber interface {
// Publisher publish one or more messages
type Publisher interface {
// Publish one or multiple Channel.
- Publish(messages []*Message) error
+ Publish(messages []byte) error
// PublishAsync publish message and return immediately
// If error occurred it will be printed into the logger
- PublishAsync(messages []*Message)
+ PublishAsync(messages []byte)
}
// Reader interface should return next message
diff --git a/plugins/websockets/schema/message.fbs b/pkg/pubsub/message.fbs
index f2d92c78..7e975894 100644
--- a/plugins/websockets/schema/message.fbs
+++ b/pkg/pubsub/message.fbs
@@ -7,4 +7,8 @@ table Message {
payload:[byte];
}
-root_type Message;
+table Messages {
+ messages:[Message];
+}
+
+root_type Messages;
diff --git a/plugins/websockets/schema/message/Message.go b/pkg/pubsub/message/Message.go
index 26bbd12c..26bbd12c 100644
--- a/plugins/websockets/schema/message/Message.go
+++ b/pkg/pubsub/message/Message.go
diff --git a/pkg/pubsub/message/Messages.go b/pkg/pubsub/message/Messages.go
new file mode 100644
index 00000000..633b367d
--- /dev/null
+++ b/pkg/pubsub/message/Messages.go
@@ -0,0 +1,67 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package message
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Messages struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Messages{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func GetSizePrefixedRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages {
+ n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
+ x := &Messages{}
+ x.Init(buf, n+offset+flatbuffers.SizeUint32)
+ return x
+}
+
+func (rcv *Messages) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Messages) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Messages) Messages(obj *Message, j int) bool {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ x := rcv._tab.Vector(o)
+ x += flatbuffers.UOffsetT(j) * 4
+ x = rcv._tab.Indirect(x)
+ obj.Init(rcv._tab.Bytes, x)
+ return true
+ }
+ return false
+}
+
+func (rcv *Messages) MessagesLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func MessagesStart(builder *flatbuffers.Builder) {
+ builder.StartObject(1)
+}
+func MessagesAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(messages), 0)
+}
+func MessagesStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func MessagesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 49c187bc..b9c5933a 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -15,14 +15,14 @@ type Plugin struct {
log logger.Logger
// channel with the messages from the RPC
- pushCh chan *pubsub.Message
+ pushCh chan []byte
// user-subscribed topics
topics sync.Map
}
func (p *Plugin) Init(log logger.Logger) error {
p.log = log
- p.pushCh = make(chan *pubsub.Message, 100)
+ p.pushCh = make(chan []byte, 100)
return nil
}
@@ -34,18 +34,14 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) Publish(messages []*pubsub.Message) error {
- for i := 0; i < len(messages); i++ {
- p.pushCh <- messages[i]
- }
+func (p *Plugin) Publish(messages []byte) error {
+ p.pushCh <- messages
return nil
}
-func (p *Plugin) PublishAsync(messages []*pubsub.Message) {
+func (p *Plugin) PublishAsync(messages []byte) {
go func() {
- for i := 0; i < len(messages); i++ {
- p.pushCh <- messages[i]
- }
+ p.pushCh <- messages
}()
}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 93b13124..6c9a5650 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,8 +4,7 @@ import (
"context"
"sync"
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/go-redis/redis/v8"
@@ -22,13 +21,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan *pubsub.Message
+ out chan *message.Message
exit chan struct{}
}
func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *pubsub.Message, 100)
+ out := make(chan *message.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -65,14 +64,14 @@ func (fi *FanIn) read() {
if !ok {
return
}
- m := &pubsub.Message{}
- err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
- if err != nil {
- fi.log.Error("failed to unmarshal payload", "error", err.Error())
- continue
- }
-
- fi.out <- m
+ //m := &pubsub.Message{}
+ //err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
+ //if err != nil {
+ // fi.log.Error("failed to unmarshal payload", "error", err.Error())
+ // continue
+ //}
+
+ fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
case <-fi.exit:
return
}
@@ -95,6 +94,6 @@ func (fi *FanIn) Stop() error {
return nil
}
-func (fi *FanIn) Consume() <-chan *pubsub.Message {
+func (fi *FanIn) Consume() <-chan *message.Message {
return fi.out
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index c1480de8..3a21204e 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -7,8 +7,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const PluginName = "redis"
@@ -101,34 +103,36 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-func (p *Plugin) Publish(msg []*pubsub.Message) error {
+func (p *Plugin) Publish(msg []byte) error {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- if f.Err() != nil {
- return f.Err()
- }
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+
+ for j := 0; j < fbsMsg.TopicsLength(); j++ {
+ t := fbsMsg.Table()
+ vec := t.ByteVector(0)
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec)
+ if f.Err() != nil {
+ return f.Err()
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
+func (p *Plugin) PublishAsync(msg []byte) {
go func() {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- if f.Err() != nil {
- p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
- continue
- }
- }
- }
+ //p.Lock()
+ //defer p.Unlock()
+ //for i := 0; i < len(msg); i++ {
+ // for j := 0; j < len(msg[i].Topics); j++ {
+ // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
+ // if f.Err() != nil {
+ // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
+ // continue
+ // }
+ // }
+ //}
}()
}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 9b21ff8f..16cde0cc 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -15,6 +15,7 @@ import (
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -25,6 +26,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -284,39 +286,46 @@ func (p *Plugin) Reset() error {
}
// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(msg []*pubsub.Message) error {
+func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- if br, ok := p.pubsubs[msg[i].Broker]; ok {
- err := br.Publish(msg)
+ // Get payload
+ fbsMsg := message.GetRootAsMessages(m, 0)
+ tmpMsg := &message.Message{}
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ fbsMsg.Messages(tmpMsg, i)
+
+ for j := 0; j < tmpMsg.TopicsLength(); j++ {
+ if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok {
+ table := tmpMsg.Table()
+ err := br.Publish(table.ByteVector(0))
if err != nil {
return errors.E(err)
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker)
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker())
}
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
- go func() {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- err := p.pubsubs[msg[i].Broker].Publish(msg)
- if err != nil {
- p.log.Error("publish async error", "error", err)
- return
- }
- }
- }
- }()
+func (p *Plugin) PublishAsync(msg []byte) {
+ //go func() {
+ // p.Lock()
+ // defer p.Unlock()
+ // for i := 0; i < len(msg); i++ {
+ // for j := 0; j < len(msg[i].Topics); j++ {
+ // err := p.pubsubs[msg[i].Broker].Publish(msg)
+ // if err != nil {
+ // p.log.Error("publish async error", "error", err)
+ // return
+ // }
+ // }
+ // }
+ //}()
}
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 2fb0f1b9..d915aa43 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -2,7 +2,6 @@ package websockets
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -12,9 +11,9 @@ type rpc struct {
log logger.Logger
}
-func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error {
+func (r *rpc) Publish(msg []byte, ok *bool) error {
const op = errors.Op("broadcast_publish")
- r.log.Debug("message published", "msg", msg)
+ r.log.Debug("message published")
// just return in case of nil message
if msg == nil {
@@ -31,7 +30,7 @@ func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error {
return nil
}
-func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error {
+func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
r.log.Debug("message published", "msg", msg)
// just return in case of nil message
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 772b53ac..f5289752 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -13,9 +13,12 @@ import (
"time"
"github.com/fasthttp/websocket"
+ flatbuffers "github.com/google/flatbuffers/go"
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ message2 "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -883,3 +886,50 @@ func message(command string, broker string, payload []byte, topics ...string) *M
Payload: payload,
}
}
+
+func makePayload(b *flatbuffers.Builder, storage string, items []pubsub.Message) []byte {
+ b.Reset()
+
+ storageOffset := b.CreateString(storage)
+
+ // //////////////////// ITEMS VECTOR ////////////////////////////
+ offset := make([]flatbuffers.UOffsetT, len(items))
+ for i := len(items) - 1; i >= 0; i-- {
+ offset[i] = serializeItems(b, items[i])
+ }
+
+ message2.MessageStartTopicsVector(b, len(offset))
+
+ for i := len(offset) - 1; i >= 0; i-- {
+ b.PrependUOffsetT(offset[i])
+ }
+
+ itemsOffset := b.EndVector(len(offset))
+ // /////////////////////////////////////////////////////////////////
+
+ message2.MessageStart(b)
+ message2.MessagesAddMessages(b, itemsOffset)
+ message2.PayloadAddStorage(b, storageOffset)
+
+ finalOffset := message2.PayloadEnd(b)
+
+ b.Finish(finalOffset)
+
+ return b.Bytes[b.Head():]
+}
+
+func serializeItems(b *flatbuffers.Builder, item pubsub.Message) flatbuffers.UOffsetT {
+ br := b.CreateString(item.Broker)
+ cmd := b.CreateString(item.Command)
+ payload := b.CreateByteVector(item.Payload)
+
+
+
+ message2.MessageStart(b)
+
+ message2.MessageAddBroker(b, br)
+ message2.MessageAddCommand(b, cmd)
+ message2.MessageAddPayload(b, payload)
+
+ return message2.MessageEnd(b)
+}