summaryrefslogtreecommitdiff
path: root/plugins/redis/fanin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-27 00:09:33 +0300
committerValery Piashchynski <[email protected]>2021-05-27 00:09:33 +0300
commitdc3c5455e5c9b32737a0620c8bdb8bda0226dba7 (patch)
tree6ba562da6de7f32a8d528b72cbb56a8bc98c1b30 /plugins/redis/fanin.go
parentd2e9d8320857f5768c54843a43ad16f59d6a3e8f (diff)
- Update all main abstractions
- Desighn a new interfaces responsible for the whole PubSub - New plugin - websockets Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis/fanin.go')
-rw-r--r--plugins/redis/fanin.go100
1 files changed, 100 insertions, 0 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
new file mode 100644
index 00000000..29016720
--- /dev/null
+++ b/plugins/redis/fanin.go
@@ -0,0 +1,100 @@
+package redis
+
+import (
+ "context"
+ "sync"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type FanIn struct {
+ sync.Mutex
+
+ client redis.UniversalClient
+ pubsub *redis.PubSub
+
+ log logger.Logger
+
+ // out channel with all subs
+ out chan pubsub.Message
+
+ exit chan struct{}
+}
+
+func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
+ out := make(chan pubsub.Message, 100)
+ fi := &FanIn{
+ out: out,
+ client: redisClient,
+ pubsub: redisClient.Subscribe(context.Background()),
+ exit: make(chan struct{}),
+ log: log,
+ }
+
+ // start reading messages
+ go fi.read()
+
+ return fi
+}
+
+func (fi *FanIn) AddChannel(topics ...string) error {
+ const op = errors.Op("fanin_addchannel")
+ err := fi.pubsub.Subscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// read reads messages from the pubsub subscription
+func (fi *FanIn) read() {
+ for {
+ select {
+ //here we receive message from us (which we sent before in Publish)
+ //it should be compatible with the websockets.Msg interface
+ //payload should be in the redis.message.payload field
+
+ case msg, ok := <-fi.pubsub.Channel():
+ // channel closed
+ if !ok {
+ return
+ }
+ m := &pubsub.Msg{}
+ err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
+ if err != nil {
+ fi.log.Error("failed to unmarshal payload", "error", err.Error())
+ continue
+ }
+
+ fi.out <- m
+ case <-fi.exit:
+ return
+ }
+ }
+}
+
+func (fi *FanIn) RemoveChannel(topics ...string) error {
+ const op = errors.Op("fanin_remove")
+ err := fi.pubsub.Unsubscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (fi *FanIn) Stop() error {
+ fi.exit <- struct{}{}
+ close(fi.out)
+ close(fi.exit)
+ return nil
+}
+
+func (fi *FanIn) Consume() <-chan pubsub.Message {
+ return fi.out
+}