summaryrefslogtreecommitdiff
path: root/plugins/redis/fanin.go
blob: 321bfaaae7a01d02c1275ecd969aff65e881694b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package redis

import (
	"context"
	"sync"

	"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
	"github.com/spiral/roadrunner/v2/plugins/logger"

	"github.com/go-redis/redis/v8"
	"github.com/spiral/errors"
	"github.com/spiral/roadrunner/v2/utils"
)

type FanIn struct {
	sync.Mutex

	// redis client
	client redis.UniversalClient
	pubsub *redis.PubSub

	log logger.Logger

	// out channel with all subs
	out chan *message.Message

	exit chan struct{}
}

func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
	out := make(chan *message.Message, 100)
	fi := &FanIn{
		out:    out,
		client: redisClient,
		pubsub: redisClient.Subscribe(context.Background()),
		exit:   make(chan struct{}),
		log:    log,
	}

	// start reading messages
	go fi.read()

	return fi
}

func (fi *FanIn) sub(topics ...string) error {
	const op = errors.Op("fanin_addchannel")
	err := fi.pubsub.Subscribe(context.Background(), topics...)
	if err != nil {
		return errors.E(op, err)
	}
	return nil
}

// read reads messages from the pubsub subscription
func (fi *FanIn) read() {
	for {
		select {
		// here we receive message from us (which we sent before in Publish)
		// it should be compatible with the websockets.Msg interface
		// payload should be in the redis.message.payload field

		case msg, ok := <-fi.pubsub.Channel():
			// channel closed
			if !ok {
				return
			}
			fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
		case <-fi.exit:
			return
		}
	}
}

func (fi *FanIn) unsub(topic string) error {
	const op = errors.Op("fanin_remove")
	err := fi.pubsub.Unsubscribe(context.Background(), topic)
	if err != nil {
		return errors.E(op, err)
	}
	return nil
}

func (fi *FanIn) stop() error {
	fi.exit <- struct{}{}
	close(fi.out)
	close(fi.exit)
	return nil
}

func (fi *FanIn) consume() <-chan *message.Message {
	return fi.out
}