summaryrefslogtreecommitdiff
path: root/plugins/redis/pubsub.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis/pubsub.go')
-rw-r--r--plugins/redis/pubsub.go35
1 files changed, 9 insertions, 26 deletions
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index 9c3d0134..6ab281f3 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -9,8 +9,6 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
- websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
- "google.golang.org/protobuf/proto"
)
type PubSubDriver struct {
@@ -83,41 +81,26 @@ func (p *PubSubDriver) stop() {
}()
}
-func (p *PubSubDriver) Publish(msg []byte) error {
+func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
p.Lock()
defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return errors.E(err)
+ f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
+ if f.Err() != nil {
+ return f.Err()
}
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- return f.Err()
- }
- }
return nil
}
-func (p *PubSubDriver) PublishAsync(msg []byte) {
+func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
go func() {
p.Lock()
defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- p.log.Error("message unmarshal error")
- return
- }
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- p.log.Error("redis publish", "error", f.Err())
- }
+ f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
+ if f.Err() != nil {
+ p.log.Error("redis publish", "error", f.Err())
}
}()
}
@@ -189,6 +172,6 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
+func (p *PubSubDriver) Next() (*pubsub.Message, error) {
return <-p.fanin.consume(), nil
}