summaryrefslogtreecommitdiff
path: root/plugins/memory/memorypubsub/pubsub.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/memory/memorypubsub/pubsub.go')
-rw-r--r--plugins/memory/memorypubsub/pubsub.go92
1 files changed, 0 insertions, 92 deletions
diff --git a/plugins/memory/memorypubsub/pubsub.go b/plugins/memory/memorypubsub/pubsub.go
deleted file mode 100644
index 231da134..00000000
--- a/plugins/memory/memorypubsub/pubsub.go
+++ /dev/null
@@ -1,92 +0,0 @@
-package memorypubsub
-
-import (
- "context"
- "sync"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/bst"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type PubSubDriver struct {
- sync.RWMutex
- // channel with the messages from the RPC
- pushCh chan *pubsub.Message
- // user-subscribed topics
- storage bst.Storage
- log logger.Logger
-}
-
-func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) {
- ps := &PubSubDriver{
- pushCh: make(chan *pubsub.Message, 100),
- storage: bst.NewBST(),
- log: log,
- }
- return ps, nil
-}
-
-func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
- p.pushCh <- msg
- return nil
-}
-
-func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
- go func() {
- p.pushCh <- msg
- }()
-}
-
-func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(topics); i++ {
- p.storage.Insert(connectionID, topics[i])
- }
- return nil
-}
-
-func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(topics); i++ {
- p.storage.Remove(connectionID, topics[i])
- }
- return nil
-}
-
-func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
- p.RLock()
- defer p.RUnlock()
-
- ret := p.storage.Get(topic)
- for rr := range ret {
- res[rr] = struct{}{}
- }
-}
-
-func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
- const op = errors.Op("pubsub_memory")
- select {
- case msg := <-p.pushCh:
- if msg == nil {
- return nil, nil
- }
-
- p.RLock()
- defer p.RUnlock()
- // push only messages, which topics are subscibed
- // TODO better???
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(msg.Topic); ok {
- return msg, nil
- }
- case <-ctx.Done():
- return nil, errors.E(op, errors.TimeOut, ctx.Err())
- }
-
- return nil, nil
-}