summaryrefslogtreecommitdiff
path: root/plugins/redis
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-20 16:40:14 +0300
committerValery Piashchynski <[email protected]>2021-06-20 16:40:14 +0300
commit2dd30155de6faaf6005027d5337a840310c827f9 (patch)
treeaa6f0ce2d2db2047b7e729b16dd70d721f4bae55 /plugins/redis
parent25dfc0d837827d0d1c729d323dd651ca6163fe09 (diff)
- Update redis/memory pubsubs
- Rework internal message bus - Add new tests for the broadcast plugin and include them into the GA Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis')
-rw-r--r--plugins/redis/fanin.go21
-rw-r--r--plugins/redis/pubsub.go35
2 files changed, 17 insertions, 39 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 0bdd4cf5..40a99d20 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,12 +4,10 @@ import (
"context"
"sync"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
- "google.golang.org/protobuf/proto"
-
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -23,13 +21,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan *websocketsv1.Message
+ out chan *pubsub.Message
exit chan struct{}
}
func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *websocketsv1.Message, 100)
+ out := make(chan *pubsub.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -67,14 +65,11 @@ func (fi *FanIn) read() {
return
}
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
- if err != nil {
- fi.log.Error("message unmarshal")
- continue
+ fi.out <- &pubsub.Message{
+ Topic: msg.Channel,
+ Payload: utils.AsBytes(msg.Payload),
}
- fi.out <- m
case <-fi.exit:
return
}
@@ -97,6 +92,6 @@ func (fi *FanIn) stop() error {
return nil
}
-func (fi *FanIn) consume() <-chan *websocketsv1.Message {
+func (fi *FanIn) consume() <-chan *pubsub.Message {
return fi.out
}
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index 9c3d0134..6ab281f3 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -9,8 +9,6 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
- websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
- "google.golang.org/protobuf/proto"
)
type PubSubDriver struct {
@@ -83,41 +81,26 @@ func (p *PubSubDriver) stop() {
}()
}
-func (p *PubSubDriver) Publish(msg []byte) error {
+func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
p.Lock()
defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return errors.E(err)
+ f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
+ if f.Err() != nil {
+ return f.Err()
}
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- return f.Err()
- }
- }
return nil
}
-func (p *PubSubDriver) PublishAsync(msg []byte) {
+func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
go func() {
p.Lock()
defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- p.log.Error("message unmarshal error")
- return
- }
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- p.log.Error("redis publish", "error", f.Err())
- }
+ f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
+ if f.Err() != nil {
+ p.log.Error("redis publish", "error", f.Err())
}
}()
}
@@ -189,6 +172,6 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
+func (p *PubSubDriver) Next() (*pubsub.Message, error) {
return <-p.fanin.consume(), nil
}