summaryrefslogtreecommitdiff
path: root/plugins/redis
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis')
-rw-r--r--plugins/redis/fanin.go8
-rw-r--r--plugins/redis/plugin.go16
2 files changed, 12 insertions, 12 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 8e924b2d..93b13124 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -22,13 +22,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan pubsub.Message
+ out chan *pubsub.Message
exit chan struct{}
}
func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan pubsub.Message, 100)
+ out := make(chan *pubsub.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -65,7 +65,7 @@ func (fi *FanIn) read() {
if !ok {
return
}
- m := &pubsub.Msg{}
+ m := &pubsub.Message{}
err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
if err != nil {
fi.log.Error("failed to unmarshal payload", "error", err.Error())
@@ -95,6 +95,6 @@ func (fi *FanIn) Stop() error {
return nil
}
-func (fi *FanIn) Consume() <-chan pubsub.Message {
+func (fi *FanIn) Consume() <-chan *pubsub.Message {
return fi.out
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 24ed1f92..c1480de8 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -101,13 +101,13 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-func (p *Plugin) Publish(msg []pubsub.Message) error {
+func (p *Plugin) Publish(msg []*pubsub.Message) error {
p.Lock()
defer p.Unlock()
for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics()); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ for j := 0; j < len(msg[i].Topics); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
if f.Err() != nil {
return f.Err()
}
@@ -116,15 +116,15 @@ func (p *Plugin) Publish(msg []pubsub.Message) error {
return nil
}
-func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
go func() {
p.Lock()
defer p.Unlock()
for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics()); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ for j := 0; j < len(msg[i].Topics); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
if f.Err() != nil {
- p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error())
+ p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
continue
}
}
@@ -141,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
}
// Next return next message
-func (p *Plugin) Next() (pubsub.Message, error) {
+func (p *Plugin) Next() (*pubsub.Message, error) {
return <-p.fanin.Consume(), nil
}