summaryrefslogtreecommitdiff
path: root/plugins/memory/memorypubsub/pubsub.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/memory/memorypubsub/pubsub.go')
-rw-r--r--plugins/memory/memorypubsub/pubsub.go92
1 files changed, 92 insertions, 0 deletions
diff --git a/plugins/memory/memorypubsub/pubsub.go b/plugins/memory/memorypubsub/pubsub.go
new file mode 100644
index 00000000..75122571
--- /dev/null
+++ b/plugins/memory/memorypubsub/pubsub.go
@@ -0,0 +1,92 @@
+package memorypubsub
+
+import (
+ "context"
+ "sync"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/bst"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type PubSubDriver struct {
+ sync.RWMutex
+ // channel with the messages from the RPC
+ pushCh chan *pubsub.Message
+ // user-subscribed topics
+ storage bst.Storage
+ log logger.Logger
+}
+
+func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) {
+ ps := &PubSubDriver{
+ pushCh: make(chan *pubsub.Message, 10),
+ storage: bst.NewBST(),
+ log: log,
+ }
+ return ps, nil
+}
+
+func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
+ p.pushCh <- msg
+ return nil
+}
+
+func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
+ go func() {
+ p.pushCh <- msg
+ }()
+}
+
+func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(topics); i++ {
+ p.storage.Insert(connectionID, topics[i])
+ }
+ return nil
+}
+
+func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(topics); i++ {
+ p.storage.Remove(connectionID, topics[i])
+ }
+ return nil
+}
+
+func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
+ p.RLock()
+ defer p.RUnlock()
+
+ ret := p.storage.Get(topic)
+ for rr := range ret {
+ res[rr] = struct{}{}
+ }
+}
+
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("pubsub_memory")
+ select {
+ case msg := <-p.pushCh:
+ if msg == nil {
+ return nil, nil
+ }
+
+ p.RLock()
+ defer p.RUnlock()
+ // push only messages, which topics are subscibed
+ // TODO better???
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(msg.Topic); ok {
+ return msg, nil
+ }
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
+
+ return nil, nil
+}