summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/memory/plugin.go13
-rw-r--r--plugins/redis/fanin.go7
-rw-r--r--plugins/redis/plugin.go28
-rw-r--r--plugins/websockets/plugin.go55
-rw-r--r--plugins/websockets/pool/workers_pool.go26
-rw-r--r--plugins/websockets/rpc.go87
-rw-r--r--plugins/websockets/storage/storage.go11
7 files changed, 154 insertions, 73 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index b9c5933a..eb87b39e 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -3,8 +3,9 @@ package memory
import (
"sync"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -59,18 +60,20 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
return nil
}
-func (p *Plugin) Next() (*pubsub.Message, error) {
+func (p *Plugin) Next() (*message.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
}
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+
// push only messages, which are subscribed
// TODO better???
- for i := 0; i < len(msg.Topics); i++ {
- if _, ok := p.topics.Load(msg.Topics[i]); ok {
- return msg, nil
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if _, ok := p.topics.Load(utils.AsString(fbsMsg.Topics(i))); ok {
+ return fbsMsg, nil
}
}
return nil, nil
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 6c9a5650..3082f24f 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -64,13 +64,6 @@ func (fi *FanIn) read() {
if !ok {
return
}
- //m := &pubsub.Message{}
- //err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
- //if err != nil {
- // fi.log.Error("failed to unmarshal payload", "error", err.Error())
- // continue
- //}
-
fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
case <-fi.exit:
return
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 3a21204e..5b9de5fc 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -6,7 +6,6 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -110,9 +109,7 @@ func (p *Plugin) Publish(msg []byte) error {
fbsMsg := message.GetRootAsMessage(msg, 0)
for j := 0; j < fbsMsg.TopicsLength(); j++ {
- t := fbsMsg.Table()
- vec := t.ByteVector(0)
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec)
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
if f.Err() != nil {
return f.Err()
}
@@ -122,17 +119,16 @@ func (p *Plugin) Publish(msg []byte) error {
func (p *Plugin) PublishAsync(msg []byte) {
go func() {
- //p.Lock()
- //defer p.Unlock()
- //for i := 0; i < len(msg); i++ {
- // for j := 0; j < len(msg[i].Topics); j++ {
- // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- // if f.Err() != nil {
- // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
- // continue
- // }
- // }
- //}
+ p.Lock()
+ defer p.Unlock()
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+ for j := 0; j < fbsMsg.TopicsLength(); j++ {
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error())
+ return
+ }
+ }
}()
}
@@ -145,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
}
// Next return next message
-func (p *Plugin) Next() (*pubsub.Message, error) {
+func (p *Plugin) Next() (*message.Message, error) {
return <-p.fanin.Consume(), nil
}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 16cde0cc..fe55d30e 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -227,7 +227,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.log.Error("command loop error, disconnecting", "error", err.Error())
return
}
-
p.log.Info("disconnected", "connectionID", connectionID)
})
}
@@ -291,41 +290,37 @@ func (p *Plugin) Publish(m []byte) error {
defer p.Unlock()
// Get payload
- fbsMsg := message.GetRootAsMessages(m, 0)
- tmpMsg := &message.Message{}
-
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- fbsMsg.Messages(tmpMsg, i)
-
- for j := 0; j < tmpMsg.TopicsLength(); j++ {
- if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok {
- table := tmpMsg.Table()
- err := br.Publish(table.ByteVector(0))
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker())
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
+ if err != nil {
+ return errors.E(err)
}
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []byte) {
- //go func() {
- // p.Lock()
- // defer p.Unlock()
- // for i := 0; i < len(msg); i++ {
- // for j := 0; j < len(msg[i].Topics); j++ {
- // err := p.pubsubs[msg[i].Broker].Publish(msg)
- // if err != nil {
- // p.log.Error("publish async error", "error", err)
- // return
- // }
- // }
- // }
- //}()
+func (p *Plugin) PublishAsync(m []byte) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ return
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ }
+ }
+ }()
}
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 8f18580f..7fcc873b 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,10 +4,11 @@ import (
"sync"
"github.com/fasthttp/websocket"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+ "github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
@@ -16,7 +17,7 @@ type WorkersPool struct {
resPool sync.Pool
log logger.Logger
- queue chan *pubsub.Message
+ queue chan *message.Message
exit chan struct{}
}
@@ -24,7 +25,7 @@ type WorkersPool struct {
func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *pubsub.Message, 100),
+ queue: make(chan *message.Message, 100),
storage: storage,
log: log,
exit: make(chan struct{}),
@@ -42,7 +43,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
return wp
}
-func (wp *WorkersPool) Queue(msg *pubsub.Message) {
+func (wp *WorkersPool) Queue(msg *message.Message) {
wp.queue <- msg
}
@@ -75,16 +76,18 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if !ok {
return
}
- // do not handle nil's
+ _ = msg
if msg == nil {
continue
}
- if len(msg.Topics) == 0 {
+ if msg.TopicsLength() == 0 {
continue
}
res := wp.get()
- // get connections for the particular topic
- wp.storage.GetByPtr(msg.Topics, res)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ // get connections for the particular topic
+ wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res)
+ }
if len(res) == 0 {
wp.log.Info("no such topic", "topic", msg.Topics)
wp.put(res)
@@ -99,7 +102,12 @@ func (wp *WorkersPool) do() { //nolint:gocognit
}
conn := c.(*connection.Connection)
- err := conn.Write(websocket.BinaryMessage, msg.Payload)
+ // TODO sync pool for the bytes
+ bb := make([]byte, msg.PayloadLength())
+ for i := 0; i < msg.PayloadLength(); i++ {
+ bb[i] = byte(msg.Payload(i))
+ }
+ err := conn.Write(websocket.BinaryMessage, bb)
if err != nil {
wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
wp.put(res)
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index d915aa43..6c2cacb4 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -1,7 +1,9 @@
package websockets
import (
+ flatbuffers "github.com/google/flatbuffers/go"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -11,6 +13,8 @@ type rpc struct {
log logger.Logger
}
+// Publish ... msg is a flatbuffers decoded payload
+// see: pkg/pubsub/message.fbs
func (r *rpc) Publish(msg []byte, ok *bool) error {
const op = errors.Op("broadcast_publish")
r.log.Debug("message published")
@@ -21,15 +25,35 @@ func (r *rpc) Publish(msg []byte, ok *bool) error {
return nil
}
- err := r.plugin.Publish(msg)
- if err != nil {
- *ok = false
- return errors.E(op, err)
+ fbsMsg := message.GetRootAsMessages(msg, 0)
+ tmpMsg := &message.Message{}
+
+ b := flatbuffers.NewBuilder(100)
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ // init a message
+ fbsMsg.Messages(tmpMsg, i)
+
+ // overhead HERE
+ orig := serializeMsg(b, tmpMsg)
+ bb := make([]byte, len(orig))
+ copy(bb, orig)
+
+ err := r.plugin.Publish(bb)
+ if err != nil {
+ *ok = false
+ b.Reset()
+ return errors.E(op, err)
+ }
+ b.Reset()
}
+
*ok = true
return nil
}
+// PublishAsync ...
+// see: pkg/pubsub/message.fbs
func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
r.log.Debug("message published", "msg", msg)
@@ -38,9 +62,60 @@ func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
*ok = true
return nil
}
- // publish to the registered broker
- r.plugin.PublishAsync(msg)
+
+ fbsMsg := message.GetRootAsMessages(msg, 0)
+ tmpMsg := &message.Message{}
+
+ b := flatbuffers.NewBuilder(100)
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ // init a message
+ fbsMsg.Messages(tmpMsg, i)
+
+ // overhead HERE
+ orig := serializeMsg(b, tmpMsg)
+ bb := make([]byte, len(orig))
+ copy(bb, orig)
+
+ r.plugin.PublishAsync(bb)
+ b.Reset()
+ }
*ok = true
return nil
}
+
+func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte {
+ cmdOff := b.CreateByteString(msg.Command())
+ brokerOff := b.CreateByteString(msg.Broker())
+
+ offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength())
+ for j := msg.TopicsLength() - 1; j >= 0; j-- {
+ offsets[j] = b.CreateByteString(msg.Topics(j))
+ }
+
+ message.MessageStartTopicsVector(b, len(offsets))
+
+ for j := len(offsets) - 1; j >= 0; j-- {
+ b.PrependUOffsetT(offsets[j])
+ }
+
+ tOff := b.EndVector(len(offsets))
+ bb := make([]byte, msg.PayloadLength())
+ for i := 0; i < msg.PayloadLength(); i++ {
+ bb[i] = byte(msg.Payload(i))
+ }
+ pOff := b.CreateByteVector(bb)
+
+ message.MessageStart(b)
+
+ message.MessageAddCommand(b, cmdOff)
+ message.MessageAddBroker(b, brokerOff)
+ message.MessageAddTopics(b, tOff)
+ message.MessageAddPayload(b, pOff)
+
+ fOff := message.MessageEnd(b)
+ b.Finish(fOff)
+
+ return b.FinishedBytes()
+}
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
index ac256be2..43834658 100644
--- a/plugins/websockets/storage/storage.go
+++ b/plugins/websockets/storage/storage.go
@@ -63,6 +63,17 @@ func (s *Storage) GetByPtrTS(topics []string, res map[string]struct{}) {
}
}
}
+func (s *Storage) GetOneByPtr(topic string, res map[string]struct{}) {
+ s.RLock()
+ defer s.RUnlock()
+
+ d := s.BST.Get(topic)
+ if len(d) > 0 {
+ for ii := range d {
+ res[ii] = struct{}{}
+ }
+ }
+}
func (s *Storage) GetByPtr(topics []string, res map[string]struct{}) {
s.RLock()