From fcda08498e8f914bbd0798da898818cd5d0e4348 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 29 May 2021 00:24:30 +0300 Subject: - Add new internal plugin - channel. Which used to deliver messages from the ws plugin to the http directly Signed-off-by: Valery Piashchynski --- plugins/redis/fanin.go | 8 ++++---- plugins/redis/plugin.go | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) (limited to 'plugins/redis') diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 8e924b2d..93b13124 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -22,13 +22,13 @@ type FanIn struct { log logger.Logger // out channel with all subs - out chan pubsub.Message + out chan *pubsub.Message exit chan struct{} } func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan pubsub.Message, 100) + out := make(chan *pubsub.Message, 100) fi := &FanIn{ out: out, client: redisClient, @@ -65,7 +65,7 @@ func (fi *FanIn) read() { if !ok { return } - m := &pubsub.Msg{} + m := &pubsub.Message{} err := json.Unmarshal(utils.AsBytes(msg.Payload), m) if err != nil { fi.log.Error("failed to unmarshal payload", "error", err.Error()) @@ -95,6 +95,6 @@ func (fi *FanIn) Stop() error { return nil } -func (fi *FanIn) Consume() <-chan pubsub.Message { +func (fi *FanIn) Consume() <-chan *pubsub.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 24ed1f92..c1480de8 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -101,13 +101,13 @@ func (p *Plugin) Name() string { // Available interface implementation func (p *Plugin) Available() {} -func (p *Plugin) Publish(msg []pubsub.Message) error { +func (p *Plugin) Publish(msg []*pubsub.Message) error { p.Lock() defer p.Unlock() for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics()); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i]) + for j := 0; j < len(msg[i].Topics); j++ { + f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) if f.Err() != nil { return f.Err() } @@ -116,15 +116,15 @@ func (p *Plugin) Publish(msg []pubsub.Message) error { return nil } -func (p *Plugin) PublishAsync(msg []pubsub.Message) { +func (p *Plugin) PublishAsync(msg []*pubsub.Message) { go func() { p.Lock() defer p.Unlock() for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics()); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i]) + for j := 0; j < len(msg[i].Topics); j++ { + f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) if f.Err() != nil { - p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error()) + p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) continue } } @@ -141,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error { } // Next return next message -func (p *Plugin) Next() (pubsub.Message, error) { +func (p *Plugin) Next() (*pubsub.Message, error) { return <-p.fanin.Consume(), nil } -- cgit v1.2.3