summaryrefslogtreecommitdiff
path: root/plugins/redis/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-29 00:24:30 +0300
committerValery Piashchynski <[email protected]>2021-05-29 00:24:30 +0300
commitfcda08498e8f914bbd0798da898818cd5d0e4348 (patch)
tree62d88384d07997e2373f3b273ba0cb83569ebced /plugins/redis/plugin.go
parent8f13eb958c7eec49acba6e343edb77c6ede89f09 (diff)
- Add new internal plugin - channel. Which used to deliver messages from
the ws plugin to the http directly Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis/plugin.go')
-rw-r--r--plugins/redis/plugin.go16
1 files changed, 8 insertions, 8 deletions
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 24ed1f92..c1480de8 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -101,13 +101,13 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-func (p *Plugin) Publish(msg []pubsub.Message) error {
+func (p *Plugin) Publish(msg []*pubsub.Message) error {
p.Lock()
defer p.Unlock()
for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics()); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ for j := 0; j < len(msg[i].Topics); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
if f.Err() != nil {
return f.Err()
}
@@ -116,15 +116,15 @@ func (p *Plugin) Publish(msg []pubsub.Message) error {
return nil
}
-func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
go func() {
p.Lock()
defer p.Unlock()
for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics()); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ for j := 0; j < len(msg[i].Topics); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
if f.Err() != nil {
- p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error())
+ p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
continue
}
}
@@ -141,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
}
// Next return next message
-func (p *Plugin) Next() (pubsub.Message, error) {
+func (p *Plugin) Next() (*pubsub.Message, error) {
return <-p.fanin.Consume(), nil
}