summaryrefslogtreecommitdiff
path: root/plugins/redis/fanin.go
blob: 29016720827374c4631b793b0b4ba9a3bf0ac9af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package redis

import (
	"context"
	"sync"

	json "github.com/json-iterator/go"
	"github.com/spiral/roadrunner/v2/pkg/pubsub"
	"github.com/spiral/roadrunner/v2/plugins/logger"

	"github.com/go-redis/redis/v8"
	"github.com/spiral/errors"
	"github.com/spiral/roadrunner/v2/utils"
)

type FanIn struct {
	sync.Mutex

	client redis.UniversalClient
	pubsub *redis.PubSub

	log logger.Logger

	// out channel with all subs
	out chan pubsub.Message

	exit chan struct{}
}

func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
	out := make(chan pubsub.Message, 100)
	fi := &FanIn{
		out:    out,
		client: redisClient,
		pubsub: redisClient.Subscribe(context.Background()),
		exit:   make(chan struct{}),
		log:    log,
	}

	// start reading messages
	go fi.read()

	return fi
}

func (fi *FanIn) AddChannel(topics ...string) error {
	const op = errors.Op("fanin_addchannel")
	err := fi.pubsub.Subscribe(context.Background(), topics...)
	if err != nil {
		return errors.E(op, err)
	}
	return nil
}

// read reads messages from the pubsub subscription
func (fi *FanIn) read() {
	for {
		select {
		//here we receive message from us (which we sent before in Publish)
		//it should be compatible with the websockets.Msg interface
		//payload should be in the redis.message.payload field

		case msg, ok := <-fi.pubsub.Channel():
			// channel closed
			if !ok {
				return
			}
			m := &pubsub.Msg{}
			err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
			if err != nil {
				fi.log.Error("failed to unmarshal payload", "error", err.Error())
				continue
			}

			fi.out <- m
		case <-fi.exit:
			return
		}
	}
}

func (fi *FanIn) RemoveChannel(topics ...string) error {
	const op = errors.Op("fanin_remove")
	err := fi.pubsub.Unsubscribe(context.Background(), topics...)
	if err != nil {
		return errors.E(op, err)
	}
	return nil
}

func (fi *FanIn) Stop() error {
	fi.exit <- struct{}{}
	close(fi.out)
	close(fi.exit)
	return nil
}

func (fi *FanIn) Consume() <-chan pubsub.Message {
	return fi.out
}