diff options
author | Valery Piashchynski <[email protected]> | 2021-06-01 14:57:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-01 14:57:33 +0300 |
commit | 352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (patch) | |
tree | d940de0ee304d3edb60daa35568c3f186dc6a8b5 | |
parent | 548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff) |
- Initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | pkg/pubsub/interface.go | 4 | ||||
-rw-r--r-- | pkg/pubsub/message.fbs (renamed from plugins/websockets/schema/message.fbs) | 6 | ||||
-rw-r--r-- | pkg/pubsub/message/Message.go (renamed from plugins/websockets/schema/message/Message.go) | 0 | ||||
-rw-r--r-- | pkg/pubsub/message/Messages.go | 67 | ||||
-rw-r--r-- | plugins/memory/plugin.go | 16 | ||||
-rw-r--r-- | plugins/redis/fanin.go | 25 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 42 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 49 | ||||
-rw-r--r-- | plugins/websockets/rpc.go | 7 | ||||
-rw-r--r-- | tests/plugins/websockets/websocket_plugin_test.go | 50 |
10 files changed, 197 insertions, 69 deletions
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go index caf8783f..63a12b34 100644 --- a/pkg/pubsub/interface.go +++ b/pkg/pubsub/interface.go @@ -19,11 +19,11 @@ type Subscriber interface { // Publisher publish one or more messages type Publisher interface { // Publish one or multiple Channel. - Publish(messages []*Message) error + Publish(messages []byte) error // PublishAsync publish message and return immediately // If error occurred it will be printed into the logger - PublishAsync(messages []*Message) + PublishAsync(messages []byte) } // Reader interface should return next message diff --git a/plugins/websockets/schema/message.fbs b/pkg/pubsub/message.fbs index f2d92c78..7e975894 100644 --- a/plugins/websockets/schema/message.fbs +++ b/pkg/pubsub/message.fbs @@ -7,4 +7,8 @@ table Message { payload:[byte]; } -root_type Message; +table Messages { + messages:[Message]; +} + +root_type Messages; diff --git a/plugins/websockets/schema/message/Message.go b/pkg/pubsub/message/Message.go index 26bbd12c..26bbd12c 100644 --- a/plugins/websockets/schema/message/Message.go +++ b/pkg/pubsub/message/Message.go diff --git a/pkg/pubsub/message/Messages.go b/pkg/pubsub/message/Messages.go new file mode 100644 index 00000000..633b367d --- /dev/null +++ b/pkg/pubsub/message/Messages.go @@ -0,0 +1,67 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Messages struct { + _tab flatbuffers.Table +} + +func GetRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Messages{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Messages{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *Messages) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Messages) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Messages) Messages(obj *Message, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Messages) MessagesLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func MessagesStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func MessagesAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(messages), 0) +} +func MessagesStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MessagesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 49c187bc..b9c5933a 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -15,14 +15,14 @@ type Plugin struct { log logger.Logger // channel with the messages from the RPC - pushCh chan *pubsub.Message + pushCh chan []byte // user-subscribed topics topics sync.Map } func (p *Plugin) Init(log logger.Logger) error { p.log = log - p.pushCh = make(chan *pubsub.Message, 100) + p.pushCh = make(chan []byte, 100) return nil } @@ -34,18 +34,14 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Publish(messages []*pubsub.Message) error { - for i := 0; i < len(messages); i++ { - p.pushCh <- messages[i] - } +func (p *Plugin) Publish(messages []byte) error { + p.pushCh <- messages return nil } -func (p *Plugin) PublishAsync(messages []*pubsub.Message) { +func (p *Plugin) PublishAsync(messages []byte) { go func() { - for i := 0; i < len(messages); i++ { - p.pushCh <- messages[i] - } + p.pushCh <- messages }() } diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 93b13124..6c9a5650 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -4,8 +4,7 @@ import ( "context" "sync" - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/go-redis/redis/v8" @@ -22,13 +21,13 @@ type FanIn struct { log logger.Logger // out channel with all subs - out chan *pubsub.Message + out chan *message.Message exit chan struct{} } func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *pubsub.Message, 100) + out := make(chan *message.Message, 100) fi := &FanIn{ out: out, client: redisClient, @@ -65,14 +64,14 @@ func (fi *FanIn) read() { if !ok { return } - m := &pubsub.Message{} - err := json.Unmarshal(utils.AsBytes(msg.Payload), m) - if err != nil { - fi.log.Error("failed to unmarshal payload", "error", err.Error()) - continue - } - - fi.out <- m + //m := &pubsub.Message{} + //err := json.Unmarshal(utils.AsBytes(msg.Payload), m) + //if err != nil { + // fi.log.Error("failed to unmarshal payload", "error", err.Error()) + // continue + //} + + fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0) case <-fi.exit: return } @@ -95,6 +94,6 @@ func (fi *FanIn) Stop() error { return nil } -func (fi *FanIn) Consume() <-chan *pubsub.Message { +func (fi *FanIn) Consume() <-chan *message.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index c1480de8..3a21204e 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -7,8 +7,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) const PluginName = "redis" @@ -101,34 +103,36 @@ func (p *Plugin) Name() string { // Available interface implementation func (p *Plugin) Available() {} -func (p *Plugin) Publish(msg []*pubsub.Message) error { +func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - if f.Err() != nil { - return f.Err() - } + fbsMsg := message.GetRootAsMessage(msg, 0) + + for j := 0; j < fbsMsg.TopicsLength(); j++ { + t := fbsMsg.Table() + vec := t.ByteVector(0) + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec) + if f.Err() != nil { + return f.Err() } } return nil } -func (p *Plugin) PublishAsync(msg []*pubsub.Message) { +func (p *Plugin) PublishAsync(msg []byte) { go func() { - p.Lock() - defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - if f.Err() != nil { - p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) - continue - } - } - } + //p.Lock() + //defer p.Unlock() + //for i := 0; i < len(msg); i++ { + // for j := 0; j < len(msg[i].Topics); j++ { + // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) + // if f.Err() != nil { + // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) + // continue + // } + // } + //} }() } diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 9b21ff8f..16cde0cc 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -15,6 +15,7 @@ import ( phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -25,6 +26,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -284,39 +286,46 @@ func (p *Plugin) Reset() error { } // Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(msg []*pubsub.Message) error { +func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - if br, ok := p.pubsubs[msg[i].Broker]; ok { - err := br.Publish(msg) + // Get payload + fbsMsg := message.GetRootAsMessages(m, 0) + tmpMsg := &message.Message{} + + for i := 0; i < fbsMsg.MessagesLength(); i++ { + fbsMsg.Messages(tmpMsg, i) + + for j := 0; j < tmpMsg.TopicsLength(); j++ { + if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok { + table := tmpMsg.Table() + err := br.Publish(table.ByteVector(0)) if err != nil { return errors.E(err) } } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker) + p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker()) } } } return nil } -func (p *Plugin) PublishAsync(msg []*pubsub.Message) { - go func() { - p.Lock() - defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - err := p.pubsubs[msg[i].Broker].Publish(msg) - if err != nil { - p.log.Error("publish async error", "error", err) - return - } - } - } - }() +func (p *Plugin) PublishAsync(msg []byte) { + //go func() { + // p.Lock() + // defer p.Unlock() + // for i := 0; i < len(msg); i++ { + // for j := 0; j < len(msg[i].Topics); j++ { + // err := p.pubsubs[msg[i].Broker].Publish(msg) + // if err != nil { + // p.log.Error("publish async error", "error", err) + // return + // } + // } + // } + //}() } func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index 2fb0f1b9..d915aa43 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -2,7 +2,6 @@ package websockets import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -12,9 +11,9 @@ type rpc struct { log logger.Logger } -func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error { +func (r *rpc) Publish(msg []byte, ok *bool) error { const op = errors.Op("broadcast_publish") - r.log.Debug("message published", "msg", msg) + r.log.Debug("message published") // just return in case of nil message if msg == nil { @@ -31,7 +30,7 @@ func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error { return nil } -func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error { +func (r *rpc) PublishAsync(msg []byte, ok *bool) error { r.log.Debug("message published", "msg", msg) // just return in case of nil message diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 772b53ac..f5289752 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -13,9 +13,12 @@ import ( "time" "github.com/fasthttp/websocket" + flatbuffers "github.com/google/flatbuffers/go" json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + message2 "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -883,3 +886,50 @@ func message(command string, broker string, payload []byte, topics ...string) *M Payload: payload, } } + +func makePayload(b *flatbuffers.Builder, storage string, items []pubsub.Message) []byte { + b.Reset() + + storageOffset := b.CreateString(storage) + + // //////////////////// ITEMS VECTOR //////////////////////////// + offset := make([]flatbuffers.UOffsetT, len(items)) + for i := len(items) - 1; i >= 0; i-- { + offset[i] = serializeItems(b, items[i]) + } + + message2.MessageStartTopicsVector(b, len(offset)) + + for i := len(offset) - 1; i >= 0; i-- { + b.PrependUOffsetT(offset[i]) + } + + itemsOffset := b.EndVector(len(offset)) + // ///////////////////////////////////////////////////////////////// + + message2.MessageStart(b) + message2.MessagesAddMessages(b, itemsOffset) + message2.PayloadAddStorage(b, storageOffset) + + finalOffset := message2.PayloadEnd(b) + + b.Finish(finalOffset) + + return b.Bytes[b.Head():] +} + +func serializeItems(b *flatbuffers.Builder, item pubsub.Message) flatbuffers.UOffsetT { + br := b.CreateString(item.Broker) + cmd := b.CreateString(item.Command) + payload := b.CreateByteVector(item.Payload) + + + + message2.MessageStart(b) + + message2.MessageAddBroker(b, br) + message2.MessageAddCommand(b, cmd) + message2.MessageAddPayload(b, payload) + + return message2.MessageEnd(b) +} |