summaryrefslogtreecommitdiff
path: root/plugins/memory/memorypubsub/pubsub.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 17:12:37 +0300
committerValery Piashchynski <[email protected]>2021-09-16 17:12:37 +0300
commitf3491c089b4da77fd8d2bc942a88b6b8d117a8a5 (patch)
tree32bfffb1f24eeee7b909747cc00a6a6b9fd3ee83 /plugins/memory/memorypubsub/pubsub.go
parent5d2cd55ab522d4f1e65a833f91146444465a32ac (diff)
Move plugins to a separate repository
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memory/memorypubsub/pubsub.go')
-rw-r--r--plugins/memory/memorypubsub/pubsub.go92
1 files changed, 0 insertions, 92 deletions
diff --git a/plugins/memory/memorypubsub/pubsub.go b/plugins/memory/memorypubsub/pubsub.go
deleted file mode 100644
index 231da134..00000000
--- a/plugins/memory/memorypubsub/pubsub.go
+++ /dev/null
@@ -1,92 +0,0 @@
-package memorypubsub
-
-import (
- "context"
- "sync"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/bst"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type PubSubDriver struct {
- sync.RWMutex
- // channel with the messages from the RPC
- pushCh chan *pubsub.Message
- // user-subscribed topics
- storage bst.Storage
- log logger.Logger
-}
-
-func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) {
- ps := &PubSubDriver{
- pushCh: make(chan *pubsub.Message, 100),
- storage: bst.NewBST(),
- log: log,
- }
- return ps, nil
-}
-
-func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
- p.pushCh <- msg
- return nil
-}
-
-func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
- go func() {
- p.pushCh <- msg
- }()
-}
-
-func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(topics); i++ {
- p.storage.Insert(connectionID, topics[i])
- }
- return nil
-}
-
-func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(topics); i++ {
- p.storage.Remove(connectionID, topics[i])
- }
- return nil
-}
-
-func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
- p.RLock()
- defer p.RUnlock()
-
- ret := p.storage.Get(topic)
- for rr := range ret {
- res[rr] = struct{}{}
- }
-}
-
-func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
- const op = errors.Op("pubsub_memory")
- select {
- case msg := <-p.pushCh:
- if msg == nil {
- return nil, nil
- }
-
- p.RLock()
- defer p.RUnlock()
- // push only messages, which topics are subscibed
- // TODO better???
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(msg.Topic); ok {
- return msg, nil
- }
- case <-ctx.Done():
- return nil, errors.E(op, errors.TimeOut, ctx.Err())
- }
-
- return nil, nil
-}