diff options
Diffstat (limited to 'plugins/memory/memorypubsub/pubsub.go')
-rw-r--r-- | plugins/memory/memorypubsub/pubsub.go | 92 |
1 files changed, 0 insertions, 92 deletions
diff --git a/plugins/memory/memorypubsub/pubsub.go b/plugins/memory/memorypubsub/pubsub.go deleted file mode 100644 index 231da134..00000000 --- a/plugins/memory/memorypubsub/pubsub.go +++ /dev/null @@ -1,92 +0,0 @@ -package memorypubsub - -import ( - "context" - "sync" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type PubSubDriver struct { - sync.RWMutex - // channel with the messages from the RPC - pushCh chan *pubsub.Message - // user-subscribed topics - storage bst.Storage - log logger.Logger -} - -func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) { - ps := &PubSubDriver{ - pushCh: make(chan *pubsub.Message, 100), - storage: bst.NewBST(), - log: log, - } - return ps, nil -} - -func (p *PubSubDriver) Publish(msg *pubsub.Message) error { - p.pushCh <- msg - return nil -} - -func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { - go func() { - p.pushCh <- msg - }() -} - -func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Insert(connectionID, topics[i]) - } - return nil -} - -func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Remove(connectionID, topics[i]) - } - return nil -} - -func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { - p.RLock() - defer p.RUnlock() - - ret := p.storage.Get(topic) - for rr := range ret { - res[rr] = struct{}{} - } -} - -func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { - const op = errors.Op("pubsub_memory") - select { - case msg := <-p.pushCh: - if msg == nil { - return nil, nil - } - - p.RLock() - defer p.RUnlock() - // push only messages, which topics are subscibed - // TODO better??? - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(msg.Topic); ok { - return msg, nil - } - case <-ctx.Done(): - return nil, errors.E(op, errors.TimeOut, ctx.Err()) - } - - return nil, nil -} |