diff options
author | Valery Piashchynski <[email protected]> | 2021-06-08 18:41:54 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-08 18:41:54 +0300 |
commit | a8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (patch) | |
tree | e7f43f625836456104bc0c39227b71e5e3cf848a /plugins/websockets | |
parent | 47c40407a7ca5f1391f4d3d504d0def166eac4e9 (diff) |
- Move ws memory pub-sub plugin into the websockets folder
- Update CHANGELOG
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/memory/inMemory.go | 95 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 6 |
2 files changed, 101 insertions, 0 deletions
diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go new file mode 100644 index 00000000..deb927ed --- /dev/null +++ b/plugins/websockets/memory/inMemory.go @@ -0,0 +1,95 @@ +package memory + +import ( + "sync" + + "github.com/spiral/roadrunner/v2/pkg/bst" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" +) + +type Plugin struct { + sync.RWMutex + log logger.Logger + + // channel with the messages from the RPC + pushCh chan []byte + // user-subscribed topics + storage bst.Storage +} + +func NewInMemory(log logger.Logger) pubsub.PubSub { + return &Plugin{ + log: log, + pushCh: make(chan []byte, 10), + storage: bst.NewBST(), + } +} + +func (p *Plugin) Publish(message []byte) error { + p.pushCh <- message + return nil +} + +func (p *Plugin) PublishAsync(message []byte) { + go func() { + p.pushCh <- message + }() +} + +func (p *Plugin) Subscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Insert(connectionID, topics[i]) + } + return nil +} + +func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Remove(connectionID, topics[i]) + } + return nil +} + +func (p *Plugin) Connections(topic string, res map[string]struct{}) { + p.RLock() + defer p.RUnlock() + + ret := p.storage.Get(topic) + for rr := range ret { + res[rr] = struct{}{} + } +} + +func (p *Plugin) Next() (*message.Message, error) { + msg := <-p.pushCh + if msg == nil { + return nil, nil + } + + p.RLock() + defer p.RUnlock() + + m := &message.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + return nil, err + } + + // push only messages, which are subscribed + // TODO better??? + for i := 0; i < len(m.GetTopics()); i++ { + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(m.GetTopics()[i]); ok { + return m, nil + } + } + return nil, nil +} diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 39a4e139..cf21fffa 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -23,6 +23,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" + "github.com/spiral/roadrunner/v2/plugins/websockets/memory" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" "google.golang.org/protobuf/proto" @@ -90,6 +91,11 @@ func (p *Plugin) Serve() chan error { p.Lock() defer p.Unlock() + // attach default driver + if len(p.pubsubs) == 0 { + p.pubsubs["memory"] = memory.NewInMemory(p.log) + } + p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, |