diff options
author | Valery Piashchynski <[email protected]> | 2021-06-01 14:57:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-01 14:57:33 +0300 |
commit | 352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (patch) | |
tree | d940de0ee304d3edb60daa35568c3f186dc6a8b5 /plugins/redis | |
parent | 548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff) |
- Initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis')
-rw-r--r-- | plugins/redis/fanin.go | 25 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 42 |
2 files changed, 35 insertions, 32 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 93b13124..6c9a5650 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -4,8 +4,7 @@ import ( "context" "sync" - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/go-redis/redis/v8" @@ -22,13 +21,13 @@ type FanIn struct { log logger.Logger // out channel with all subs - out chan *pubsub.Message + out chan *message.Message exit chan struct{} } func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *pubsub.Message, 100) + out := make(chan *message.Message, 100) fi := &FanIn{ out: out, client: redisClient, @@ -65,14 +64,14 @@ func (fi *FanIn) read() { if !ok { return } - m := &pubsub.Message{} - err := json.Unmarshal(utils.AsBytes(msg.Payload), m) - if err != nil { - fi.log.Error("failed to unmarshal payload", "error", err.Error()) - continue - } - - fi.out <- m + //m := &pubsub.Message{} + //err := json.Unmarshal(utils.AsBytes(msg.Payload), m) + //if err != nil { + // fi.log.Error("failed to unmarshal payload", "error", err.Error()) + // continue + //} + + fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0) case <-fi.exit: return } @@ -95,6 +94,6 @@ func (fi *FanIn) Stop() error { return nil } -func (fi *FanIn) Consume() <-chan *pubsub.Message { +func (fi *FanIn) Consume() <-chan *message.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index c1480de8..3a21204e 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -7,8 +7,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) const PluginName = "redis" @@ -101,34 +103,36 @@ func (p *Plugin) Name() string { // Available interface implementation func (p *Plugin) Available() {} -func (p *Plugin) Publish(msg []*pubsub.Message) error { +func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - if f.Err() != nil { - return f.Err() - } + fbsMsg := message.GetRootAsMessage(msg, 0) + + for j := 0; j < fbsMsg.TopicsLength(); j++ { + t := fbsMsg.Table() + vec := t.ByteVector(0) + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec) + if f.Err() != nil { + return f.Err() } } return nil } -func (p *Plugin) PublishAsync(msg []*pubsub.Message) { +func (p *Plugin) PublishAsync(msg []byte) { go func() { - p.Lock() - defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - if f.Err() != nil { - p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) - continue - } - } - } + //p.Lock() + //defer p.Unlock() + //for i := 0; i < len(msg); i++ { + // for j := 0; j < len(msg[i].Topics); j++ { + // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) + // if f.Err() != nil { + // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) + // continue + // } + // } + //} }() } |