summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/executor/executor.go3
-rw-r--r--plugins/websockets/memory/inMemory.go95
-rw-r--r--plugins/websockets/plugin.go40
-rw-r--r--plugins/websockets/pool/workers_pool.go54
-rw-r--r--plugins/websockets/rpc.go93
5 files changed, 163 insertions, 122 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 69aad7d4..e3d47166 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -8,6 +8,7 @@ import (
"github.com/fasthttp/websocket"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
@@ -63,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
return errors.E(op, err)
}
- msg := &pubsub.Message{}
+ msg := &websocketsv1.Message{}
err = json.Unmarshal(data, msg)
if err != nil {
diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go
new file mode 100644
index 00000000..cef28182
--- /dev/null
+++ b/plugins/websockets/memory/inMemory.go
@@ -0,0 +1,95 @@
+package memory
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/pkg/bst"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
+)
+
+type Plugin struct {
+ sync.RWMutex
+ log logger.Logger
+
+ // channel with the messages from the RPC
+ pushCh chan []byte
+ // user-subscribed topics
+ storage bst.Storage
+}
+
+func NewInMemory(log logger.Logger) pubsub.PubSub {
+ return &Plugin{
+ log: log,
+ pushCh: make(chan []byte, 10),
+ storage: bst.NewBST(),
+ }
+}
+
+func (p *Plugin) Publish(message []byte) error {
+ p.pushCh <- message
+ return nil
+}
+
+func (p *Plugin) PublishAsync(message []byte) {
+ go func() {
+ p.pushCh <- message
+ }()
+}
+
+func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(topics); i++ {
+ p.storage.Insert(connectionID, topics[i])
+ }
+ return nil
+}
+
+func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(topics); i++ {
+ p.storage.Remove(connectionID, topics[i])
+ }
+ return nil
+}
+
+func (p *Plugin) Connections(topic string, res map[string]struct{}) {
+ p.RLock()
+ defer p.RUnlock()
+
+ ret := p.storage.Get(topic)
+ for rr := range ret {
+ res[rr] = struct{}{}
+ }
+}
+
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
+ msg := <-p.pushCh
+ if msg == nil {
+ return nil, nil
+ }
+
+ p.RLock()
+ defer p.RUnlock()
+
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return nil, err
+ }
+
+ // push only messages, which are subscribed
+ // TODO better???
+ for i := 0; i < len(m.GetTopics()); i++ {
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(m.GetTopics()[i]); ok {
+ return m, nil
+ }
+ }
+ return nil, nil
+}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 4c0edcad..6ddd609c 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -23,9 +23,10 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/memory"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -79,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
+ // attach default driver
+ p.pubsubs["memory"] = memory.NewInMemory(p.log)
+
return nil
}
@@ -301,16 +305,21 @@ func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ return err
+ }
+
// Get payload
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
return errors.E(err)
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
return nil
@@ -320,16 +329,21 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ p.log.Error("message unmarshal")
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
p.log.Error("publish async error", "error", err)
- return
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
}()
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 544f3ede..1a7c6f8a 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -1,25 +1,22 @@
package pool
import (
- "bytes"
"sync"
"github.com/fasthttp/websocket"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
storage map[string]pubsub.PubSub
connections *sync.Map
resPool sync.Pool
- bPool sync.Pool
log logger.Logger
- queue chan *message.Message
+ queue chan *websocketsv1.Message
exit chan struct{}
}
@@ -27,7 +24,7 @@ type WorkersPool struct {
func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *message.Message, 100),
+ queue: make(chan *websocketsv1.Message, 100),
storage: pubsubs,
log: log,
exit: make(chan struct{}),
@@ -36,9 +33,6 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
wp.resPool.New = func() interface{} {
return make(map[string]struct{}, 10)
}
- wp.bPool.New = func() interface{} {
- return new(bytes.Buffer)
- }
// start 10 workers
for i := 0; i < 50; i++ {
@@ -48,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
return wp
}
-func (wp *WorkersPool) Queue(msg *message.Message) {
+func (wp *WorkersPool) Queue(msg *websocketsv1.Message) {
wp.queue <- msg
}
@@ -73,15 +67,6 @@ func (wp *WorkersPool) get() map[string]struct{} {
return wp.resPool.Get().(map[string]struct{})
}
-func (wp *WorkersPool) putBytes(b *bytes.Buffer) {
- b.Reset()
- wp.bPool.Put(b)
-}
-
-func (wp *WorkersPool) getBytes() *bytes.Buffer {
- return wp.bPool.Get().(*bytes.Buffer)
-}
-
func (wp *WorkersPool) do() { //nolint:gocognit
go func() {
for {
@@ -94,29 +79,27 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if msg == nil {
continue
}
- if msg.TopicsLength() == 0 {
+ if len(msg.GetTopics()) == 0 {
continue
}
- br, ok := wp.storage[utils.AsString(msg.Broker())]
+ br, ok := wp.storage[msg.Broker]
if !ok {
- wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage)
+ wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage)
continue
}
res := wp.get()
- bb := wp.getBytes()
- for i := 0; i < msg.TopicsLength(); i++ {
+ for i := 0; i < len(msg.GetTopics()); i++ {
// get connections for the particular topic
- br.Connections(utils.AsString(msg.Topics(i)), res)
+ br.Connections(msg.GetTopics()[i], res)
}
if len(res) == 0 {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Info("no such topic", "topic", msg.GetTopics()[i])
}
- wp.putBytes(bb)
wp.put(res)
continue
}
@@ -124,8 +107,8 @@ func (wp *WorkersPool) do() { //nolint:gocognit
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
}
continue
}
@@ -133,20 +116,15 @@ func (wp *WorkersPool) do() { //nolint:gocognit
conn := c.(*connection.Connection)
// put data into the bytes buffer
- for i := 0; i < msg.PayloadLength(); i++ {
- bb.WriteByte(byte(msg.Payload(i)))
- }
- err := conn.Write(websocket.BinaryMessage, bb.Bytes())
+ err := conn.Write(websocket.BinaryMessage, msg.GetPayload())
if err != nil {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
}
continue
}
}
- // put bytes buffer back
- wp.putBytes(bb)
// put map with results back
wp.put(res)
case <-wp.exit:
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 6c2cacb4..00c1dd91 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -1,10 +1,10 @@
package websockets
import (
- flatbuffers "github.com/google/flatbuffers/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
)
// rpc collectors struct
@@ -13,39 +13,32 @@ type rpc struct {
log logger.Logger
}
-// Publish ... msg is a flatbuffers decoded payload
+// Publish ... msg is a proto decoded payload
// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(msg []byte, ok *bool) error {
+func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
const op = errors.Op("broadcast_publish")
- r.log.Debug("message published")
// just return in case of nil message
- if msg == nil {
+ if in == nil {
*ok = true
return nil
}
- fbsMsg := message.GetRootAsMessages(msg, 0)
- tmpMsg := &message.Message{}
+ r.log.Debug("message published", "msg", in.Messages)
- b := flatbuffers.NewBuilder(100)
+ msgLen := len(in.GetMessages())
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- // init a message
- fbsMsg.Messages(tmpMsg, i)
-
- // overhead HERE
- orig := serializeMsg(b, tmpMsg)
- bb := make([]byte, len(orig))
- copy(bb, orig)
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
- err := r.plugin.Publish(bb)
+ err = r.plugin.Publish(bb)
if err != nil {
*ok = false
- b.Reset()
return errors.E(op, err)
}
- b.Reset()
}
*ok = true
@@ -54,68 +47,28 @@ func (r *rpc) Publish(msg []byte, ok *bool) error {
// PublishAsync ...
// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
- r.log.Debug("message published", "msg", msg)
+func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
+ const op = errors.Op("publish_async")
// just return in case of nil message
- if msg == nil {
+ if in == nil {
*ok = true
return nil
}
- fbsMsg := message.GetRootAsMessages(msg, 0)
- tmpMsg := &message.Message{}
-
- b := flatbuffers.NewBuilder(100)
+ r.log.Debug("message published", "msg", in.Messages)
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- // init a message
- fbsMsg.Messages(tmpMsg, i)
+ msgLen := len(in.GetMessages())
- // overhead HERE
- orig := serializeMsg(b, tmpMsg)
- bb := make([]byte, len(orig))
- copy(bb, orig)
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
r.plugin.PublishAsync(bb)
- b.Reset()
}
*ok = true
return nil
}
-
-func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte {
- cmdOff := b.CreateByteString(msg.Command())
- brokerOff := b.CreateByteString(msg.Broker())
-
- offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength())
- for j := msg.TopicsLength() - 1; j >= 0; j-- {
- offsets[j] = b.CreateByteString(msg.Topics(j))
- }
-
- message.MessageStartTopicsVector(b, len(offsets))
-
- for j := len(offsets) - 1; j >= 0; j-- {
- b.PrependUOffsetT(offsets[j])
- }
-
- tOff := b.EndVector(len(offsets))
- bb := make([]byte, msg.PayloadLength())
- for i := 0; i < msg.PayloadLength(); i++ {
- bb[i] = byte(msg.Payload(i))
- }
- pOff := b.CreateByteVector(bb)
-
- message.MessageStart(b)
-
- message.MessageAddCommand(b, cmdOff)
- message.MessageAddBroker(b, brokerOff)
- message.MessageAddTopics(b, tOff)
- message.MessageAddPayload(b, pOff)
-
- fOff := message.MessageEnd(b)
- b.Finish(fOff)
-
- return b.FinishedBytes()
-}