diff options
Diffstat (limited to 'plugins/memory/memorypubsub/pubsub.go')
-rw-r--r-- | plugins/memory/memorypubsub/pubsub.go | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/plugins/memory/memorypubsub/pubsub.go b/plugins/memory/memorypubsub/pubsub.go new file mode 100644 index 00000000..75122571 --- /dev/null +++ b/plugins/memory/memorypubsub/pubsub.go @@ -0,0 +1,92 @@ +package memorypubsub + +import ( + "context" + "sync" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/bst" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type PubSubDriver struct { + sync.RWMutex + // channel with the messages from the RPC + pushCh chan *pubsub.Message + // user-subscribed topics + storage bst.Storage + log logger.Logger +} + +func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) { + ps := &PubSubDriver{ + pushCh: make(chan *pubsub.Message, 10), + storage: bst.NewBST(), + log: log, + } + return ps, nil +} + +func (p *PubSubDriver) Publish(msg *pubsub.Message) error { + p.pushCh <- msg + return nil +} + +func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { + go func() { + p.pushCh <- msg + }() +} + +func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Insert(connectionID, topics[i]) + } + return nil +} + +func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Remove(connectionID, topics[i]) + } + return nil +} + +func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { + p.RLock() + defer p.RUnlock() + + ret := p.storage.Get(topic) + for rr := range ret { + res[rr] = struct{}{} + } +} + +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("pubsub_memory") + select { + case msg := <-p.pushCh: + if msg == nil { + return nil, nil + } + + p.RLock() + defer p.RUnlock() + // push only messages, which topics are subscibed + // TODO better??? + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(msg.Topic); ok { + return msg, nil + } + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } + + return nil, nil +} |