summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-08 18:41:54 +0300
committerValery Piashchynski <[email protected]>2021-06-08 18:41:54 +0300
commita8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (patch)
treee7f43f625836456104bc0c39227b71e5e3cf848a /plugins/websockets
parent47c40407a7ca5f1391f4d3d504d0def166eac4e9 (diff)
- Move ws memory pub-sub plugin into the websockets folder
- Update CHANGELOG Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/memory/inMemory.go95
-rw-r--r--plugins/websockets/plugin.go6
2 files changed, 101 insertions, 0 deletions
diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go
new file mode 100644
index 00000000..deb927ed
--- /dev/null
+++ b/plugins/websockets/memory/inMemory.go
@@ -0,0 +1,95 @@
+package memory
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/pkg/bst"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
+)
+
+type Plugin struct {
+ sync.RWMutex
+ log logger.Logger
+
+ // channel with the messages from the RPC
+ pushCh chan []byte
+ // user-subscribed topics
+ storage bst.Storage
+}
+
+func NewInMemory(log logger.Logger) pubsub.PubSub {
+ return &Plugin{
+ log: log,
+ pushCh: make(chan []byte, 10),
+ storage: bst.NewBST(),
+ }
+}
+
+func (p *Plugin) Publish(message []byte) error {
+ p.pushCh <- message
+ return nil
+}
+
+func (p *Plugin) PublishAsync(message []byte) {
+ go func() {
+ p.pushCh <- message
+ }()
+}
+
+func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(topics); i++ {
+ p.storage.Insert(connectionID, topics[i])
+ }
+ return nil
+}
+
+func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(topics); i++ {
+ p.storage.Remove(connectionID, topics[i])
+ }
+ return nil
+}
+
+func (p *Plugin) Connections(topic string, res map[string]struct{}) {
+ p.RLock()
+ defer p.RUnlock()
+
+ ret := p.storage.Get(topic)
+ for rr := range ret {
+ res[rr] = struct{}{}
+ }
+}
+
+func (p *Plugin) Next() (*message.Message, error) {
+ msg := <-p.pushCh
+ if msg == nil {
+ return nil, nil
+ }
+
+ p.RLock()
+ defer p.RUnlock()
+
+ m := &message.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return nil, err
+ }
+
+ // push only messages, which are subscribed
+ // TODO better???
+ for i := 0; i < len(m.GetTopics()); i++ {
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(m.GetTopics()[i]); ok {
+ return m, nil
+ }
+ }
+ return nil, nil
+}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 39a4e139..cf21fffa 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -23,6 +23,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/memory"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
"google.golang.org/protobuf/proto"
@@ -90,6 +91,11 @@ func (p *Plugin) Serve() chan error {
p.Lock()
defer p.Unlock()
+ // attach default driver
+ if len(p.pubsubs) == 0 {
+ p.pubsubs["memory"] = memory.NewInMemory(p.log)
+ }
+
p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,