diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/memory/plugin.go | 13 | ||||
-rw-r--r-- | plugins/redis/fanin.go | 7 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 28 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 55 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 26 | ||||
-rw-r--r-- | plugins/websockets/rpc.go | 87 | ||||
-rw-r--r-- | plugins/websockets/storage/storage.go | 11 |
7 files changed, 154 insertions, 73 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index b9c5933a..eb87b39e 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -3,8 +3,9 @@ package memory import ( "sync" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -59,18 +60,20 @@ func (p *Plugin) Unsubscribe(topics ...string) error { return nil } -func (p *Plugin) Next() (*pubsub.Message, error) { +func (p *Plugin) Next() (*message.Message, error) { msg := <-p.pushCh if msg == nil { return nil, nil } + fbsMsg := message.GetRootAsMessage(msg, 0) + // push only messages, which are subscribed // TODO better??? - for i := 0; i < len(msg.Topics); i++ { - if _, ok := p.topics.Load(msg.Topics[i]); ok { - return msg, nil + for i := 0; i < fbsMsg.TopicsLength(); i++ { + if _, ok := p.topics.Load(utils.AsString(fbsMsg.Topics(i))); ok { + return fbsMsg, nil } } return nil, nil diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 6c9a5650..3082f24f 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -64,13 +64,6 @@ func (fi *FanIn) read() { if !ok { return } - //m := &pubsub.Message{} - //err := json.Unmarshal(utils.AsBytes(msg.Payload), m) - //if err != nil { - // fi.log.Error("failed to unmarshal payload", "error", err.Error()) - // continue - //} - fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0) case <-fi.exit: return diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 3a21204e..5b9de5fc 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,7 +6,6 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -110,9 +109,7 @@ func (p *Plugin) Publish(msg []byte) error { fbsMsg := message.GetRootAsMessage(msg, 0) for j := 0; j < fbsMsg.TopicsLength(); j++ { - t := fbsMsg.Table() - vec := t.ByteVector(0) - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec) + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) if f.Err() != nil { return f.Err() } @@ -122,17 +119,16 @@ func (p *Plugin) Publish(msg []byte) error { func (p *Plugin) PublishAsync(msg []byte) { go func() { - //p.Lock() - //defer p.Unlock() - //for i := 0; i < len(msg); i++ { - // for j := 0; j < len(msg[i].Topics); j++ { - // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - // if f.Err() != nil { - // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) - // continue - // } - // } - //} + p.Lock() + defer p.Unlock() + fbsMsg := message.GetRootAsMessage(msg, 0) + for j := 0; j < fbsMsg.TopicsLength(); j++ { + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + if f.Err() != nil { + p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error()) + return + } + } }() } @@ -145,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error { } // Next return next message -func (p *Plugin) Next() (*pubsub.Message, error) { +func (p *Plugin) Next() (*message.Message, error) { return <-p.fanin.Consume(), nil } diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 16cde0cc..fe55d30e 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -227,7 +227,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { p.log.Error("command loop error, disconnecting", "error", err.Error()) return } - p.log.Info("disconnected", "connectionID", connectionID) }) } @@ -291,41 +290,37 @@ func (p *Plugin) Publish(m []byte) error { defer p.Unlock() // Get payload - fbsMsg := message.GetRootAsMessages(m, 0) - tmpMsg := &message.Message{} - - for i := 0; i < fbsMsg.MessagesLength(); i++ { - fbsMsg.Messages(tmpMsg, i) - - for j := 0; j < tmpMsg.TopicsLength(); j++ { - if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok { - table := tmpMsg.Table() - err := br.Publish(table.ByteVector(0)) - if err != nil { - return errors.E(err) - } - } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker()) + fbsMsg := message.GetRootAsMessage(m, 0) + for i := 0; i < fbsMsg.TopicsLength(); i++ { + if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { + err := br.Publish(fbsMsg.Table().Bytes) + if err != nil { + return errors.E(err) } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) } } return nil } -func (p *Plugin) PublishAsync(msg []byte) { - //go func() { - // p.Lock() - // defer p.Unlock() - // for i := 0; i < len(msg); i++ { - // for j := 0; j < len(msg[i].Topics); j++ { - // err := p.pubsubs[msg[i].Broker].Publish(msg) - // if err != nil { - // p.log.Error("publish async error", "error", err) - // return - // } - // } - // } - //}() +func (p *Plugin) PublishAsync(m []byte) { + go func() { + p.Lock() + defer p.Unlock() + fbsMsg := message.GetRootAsMessage(m, 0) + for i := 0; i < fbsMsg.TopicsLength(); i++ { + if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { + err := br.Publish(fbsMsg.Table().Bytes) + if err != nil { + p.log.Error("publish async error", "error", err) + return + } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) + } + } + }() } func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 8f18580f..7fcc873b 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,10 +4,11 @@ import ( "sync" "github.com/fasthttp/websocket" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/storage" + "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { @@ -16,7 +17,7 @@ type WorkersPool struct { resPool sync.Pool log logger.Logger - queue chan *pubsub.Message + queue chan *message.Message exit chan struct{} } @@ -24,7 +25,7 @@ type WorkersPool struct { func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *pubsub.Message, 100), + queue: make(chan *message.Message, 100), storage: storage, log: log, exit: make(chan struct{}), @@ -42,7 +43,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger. return wp } -func (wp *WorkersPool) Queue(msg *pubsub.Message) { +func (wp *WorkersPool) Queue(msg *message.Message) { wp.queue <- msg } @@ -75,16 +76,18 @@ func (wp *WorkersPool) do() { //nolint:gocognit if !ok { return } - // do not handle nil's + _ = msg if msg == nil { continue } - if len(msg.Topics) == 0 { + if msg.TopicsLength() == 0 { continue } res := wp.get() - // get connections for the particular topic - wp.storage.GetByPtr(msg.Topics, res) + for i := 0; i < msg.TopicsLength(); i++ { + // get connections for the particular topic + wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res) + } if len(res) == 0 { wp.log.Info("no such topic", "topic", msg.Topics) wp.put(res) @@ -99,7 +102,12 @@ func (wp *WorkersPool) do() { //nolint:gocognit } conn := c.(*connection.Connection) - err := conn.Write(websocket.BinaryMessage, msg.Payload) + // TODO sync pool for the bytes + bb := make([]byte, msg.PayloadLength()) + for i := 0; i < msg.PayloadLength(); i++ { + bb[i] = byte(msg.Payload(i)) + } + err := conn.Write(websocket.BinaryMessage, bb) if err != nil { wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics) wp.put(res) diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index d915aa43..6c2cacb4 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -1,7 +1,9 @@ package websockets import ( + flatbuffers "github.com/google/flatbuffers/go" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -11,6 +13,8 @@ type rpc struct { log logger.Logger } +// Publish ... msg is a flatbuffers decoded payload +// see: pkg/pubsub/message.fbs func (r *rpc) Publish(msg []byte, ok *bool) error { const op = errors.Op("broadcast_publish") r.log.Debug("message published") @@ -21,15 +25,35 @@ func (r *rpc) Publish(msg []byte, ok *bool) error { return nil } - err := r.plugin.Publish(msg) - if err != nil { - *ok = false - return errors.E(op, err) + fbsMsg := message.GetRootAsMessages(msg, 0) + tmpMsg := &message.Message{} + + b := flatbuffers.NewBuilder(100) + + for i := 0; i < fbsMsg.MessagesLength(); i++ { + // init a message + fbsMsg.Messages(tmpMsg, i) + + // overhead HERE + orig := serializeMsg(b, tmpMsg) + bb := make([]byte, len(orig)) + copy(bb, orig) + + err := r.plugin.Publish(bb) + if err != nil { + *ok = false + b.Reset() + return errors.E(op, err) + } + b.Reset() } + *ok = true return nil } +// PublishAsync ... +// see: pkg/pubsub/message.fbs func (r *rpc) PublishAsync(msg []byte, ok *bool) error { r.log.Debug("message published", "msg", msg) @@ -38,9 +62,60 @@ func (r *rpc) PublishAsync(msg []byte, ok *bool) error { *ok = true return nil } - // publish to the registered broker - r.plugin.PublishAsync(msg) + + fbsMsg := message.GetRootAsMessages(msg, 0) + tmpMsg := &message.Message{} + + b := flatbuffers.NewBuilder(100) + + for i := 0; i < fbsMsg.MessagesLength(); i++ { + // init a message + fbsMsg.Messages(tmpMsg, i) + + // overhead HERE + orig := serializeMsg(b, tmpMsg) + bb := make([]byte, len(orig)) + copy(bb, orig) + + r.plugin.PublishAsync(bb) + b.Reset() + } *ok = true return nil } + +func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte { + cmdOff := b.CreateByteString(msg.Command()) + brokerOff := b.CreateByteString(msg.Broker()) + + offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength()) + for j := msg.TopicsLength() - 1; j >= 0; j-- { + offsets[j] = b.CreateByteString(msg.Topics(j)) + } + + message.MessageStartTopicsVector(b, len(offsets)) + + for j := len(offsets) - 1; j >= 0; j-- { + b.PrependUOffsetT(offsets[j]) + } + + tOff := b.EndVector(len(offsets)) + bb := make([]byte, msg.PayloadLength()) + for i := 0; i < msg.PayloadLength(); i++ { + bb[i] = byte(msg.Payload(i)) + } + pOff := b.CreateByteVector(bb) + + message.MessageStart(b) + + message.MessageAddCommand(b, cmdOff) + message.MessageAddBroker(b, brokerOff) + message.MessageAddTopics(b, tOff) + message.MessageAddPayload(b, pOff) + + fOff := message.MessageEnd(b) + b.Finish(fOff) + + return b.FinishedBytes() +} diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go index ac256be2..43834658 100644 --- a/plugins/websockets/storage/storage.go +++ b/plugins/websockets/storage/storage.go @@ -63,6 +63,17 @@ func (s *Storage) GetByPtrTS(topics []string, res map[string]struct{}) { } } } +func (s *Storage) GetOneByPtr(topic string, res map[string]struct{}) { + s.RLock() + defer s.RUnlock() + + d := s.BST.Get(topic) + if len(d) > 0 { + for ii := range d { + res[ii] = struct{}{} + } + } +} func (s *Storage) GetByPtr(topics []string, res map[string]struct{}) { s.RLock() |