diff options
author | Valery Piashchynski <[email protected]> | 2021-05-29 00:24:30 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-29 00:24:30 +0300 |
commit | fcda08498e8f914bbd0798da898818cd5d0e4348 (patch) | |
tree | 62d88384d07997e2373f3b273ba0cb83569ebced /plugins/redis/plugin.go | |
parent | 8f13eb958c7eec49acba6e343edb77c6ede89f09 (diff) |
- Add new internal plugin - channel. Which used to deliver messages from
the ws plugin to the http directly
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis/plugin.go')
-rw-r--r-- | plugins/redis/plugin.go | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 24ed1f92..c1480de8 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -101,13 +101,13 @@ func (p *Plugin) Name() string { // Available interface implementation func (p *Plugin) Available() {} -func (p *Plugin) Publish(msg []pubsub.Message) error { +func (p *Plugin) Publish(msg []*pubsub.Message) error { p.Lock() defer p.Unlock() for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics()); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i]) + for j := 0; j < len(msg[i].Topics); j++ { + f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) if f.Err() != nil { return f.Err() } @@ -116,15 +116,15 @@ func (p *Plugin) Publish(msg []pubsub.Message) error { return nil } -func (p *Plugin) PublishAsync(msg []pubsub.Message) { +func (p *Plugin) PublishAsync(msg []*pubsub.Message) { go func() { p.Lock() defer p.Unlock() for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics()); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i]) + for j := 0; j < len(msg[i].Topics); j++ { + f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) if f.Err() != nil { - p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error()) + p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) continue } } @@ -141,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error { } // Next return next message -func (p *Plugin) Next() (pubsub.Message, error) { +func (p *Plugin) Next() (*pubsub.Message, error) { return <-p.fanin.Consume(), nil } |