summaryrefslogtreecommitdiff
path: root/plugins/redis
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis')
-rw-r--r--plugins/redis/channel.go97
-rw-r--r--plugins/redis/fanin.go102
-rw-r--r--plugins/redis/kv.go2
-rw-r--r--plugins/redis/plugin.go6
-rw-r--r--plugins/redis/pubsub.go56
5 files changed, 123 insertions, 140 deletions
diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go
new file mode 100644
index 00000000..5817853c
--- /dev/null
+++ b/plugins/redis/channel.go
@@ -0,0 +1,97 @@
+package redis
+
+import (
+ "context"
+ "sync"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type redisChannel struct {
+ sync.Mutex
+
+ // redis client
+ client redis.UniversalClient
+ pubsub *redis.PubSub
+
+ log logger.Logger
+
+ // out channel with all subs
+ out chan *pubsub.Message
+
+ exit chan struct{}
+}
+
+func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel {
+ out := make(chan *pubsub.Message, 100)
+ fi := &redisChannel{
+ out: out,
+ client: redisClient,
+ pubsub: redisClient.Subscribe(context.Background()),
+ exit: make(chan struct{}),
+ log: log,
+ }
+
+ // start reading messages
+ go fi.read()
+
+ return fi
+}
+
+func (r *redisChannel) sub(topics ...string) error {
+ const op = errors.Op("redis_sub")
+ err := r.pubsub.Subscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// read reads messages from the pubsub subscription
+func (r *redisChannel) read() {
+ for {
+ select {
+ // here we receive message from us (which we sent before in Publish)
+ // it should be compatible with the pubsub.Message structure
+ // payload should be in the redis.message.payload field
+
+ case msg, ok := <-r.pubsub.Channel():
+ // channel closed
+ if !ok {
+ return
+ }
+
+ r.out <- &pubsub.Message{
+ Topic: msg.Channel,
+ Payload: utils.AsBytes(msg.Payload),
+ }
+
+ case <-r.exit:
+ return
+ }
+ }
+}
+
+func (r *redisChannel) unsub(topic string) error {
+ const op = errors.Op("redis_unsub")
+ err := r.pubsub.Unsubscribe(context.Background(), topic)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (r *redisChannel) stop() error {
+ r.exit <- struct{}{}
+ close(r.out)
+ close(r.exit)
+ return nil
+}
+
+func (r *redisChannel) message() *pubsub.Message {
+ return <-r.out
+}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
deleted file mode 100644
index ac9ebcc2..00000000
--- a/plugins/redis/fanin.go
+++ /dev/null
@@ -1,102 +0,0 @@
-package redis
-
-import (
- "context"
- "sync"
-
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
-
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type FanIn struct {
- sync.Mutex
-
- // redis client
- client redis.UniversalClient
- pubsub *redis.PubSub
-
- log logger.Logger
-
- // out channel with all subs
- out chan *websocketsv1.Message
-
- exit chan struct{}
-}
-
-func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *websocketsv1.Message, 100)
- fi := &FanIn{
- out: out,
- client: redisClient,
- pubsub: redisClient.Subscribe(context.Background()),
- exit: make(chan struct{}),
- log: log,
- }
-
- // start reading messages
- go fi.read()
-
- return fi
-}
-
-func (fi *FanIn) sub(topics ...string) error {
- const op = errors.Op("fanin_addchannel")
- err := fi.pubsub.Subscribe(context.Background(), topics...)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-// read reads messages from the pubsub subscription
-func (fi *FanIn) read() {
- for {
- select {
- // here we receive message from us (which we sent before in Publish)
- // it should be compatible with the websockets.Msg interface
- // payload should be in the redis.message.payload field
-
- case msg, ok := <-fi.pubsub.Channel():
- // channel closed
- if !ok {
- return
- }
-
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
- if err != nil {
- fi.log.Error("message unmarshal")
- continue
- }
-
- fi.out <- m
- case <-fi.exit:
- return
- }
- }
-}
-
-func (fi *FanIn) unsub(topic string) error {
- const op = errors.Op("fanin_remove")
- err := fi.pubsub.Unsubscribe(context.Background(), topic)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-func (fi *FanIn) stop() error {
- fi.exit <- struct{}{}
- close(fi.out)
- close(fi.exit)
- return nil
-}
-
-func (fi *FanIn) consume() <-chan *websocketsv1.Message {
- return fi.out
-}
diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go
index 66cb8384..320b7443 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv.go
@@ -7,10 +7,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
)
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 24c21b55..9d98790b 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -59,8 +59,8 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-// KVProvide provides KV storage implementation over the redis plugin
-func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
+// KVConstruct provides KV storage implementation over the redis plugin
+func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("redis_plugin_provide")
st, err := NewRedisDriver(p.log, key, p.cfgPlugin)
if err != nil {
@@ -70,6 +70,6 @@ func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
return st, nil
}
-func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) {
+func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
}
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index dbda7ea4..4e41acb5 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -6,11 +6,9 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
)
type PubSubDriver struct {
@@ -18,7 +16,7 @@ type PubSubDriver struct {
cfg *Config `mapstructure:"redis"`
log logger.Logger
- fanin *FanIn
+ channel *redisChannel
universalClient redis.UniversalClient
stopCh chan struct{}
}
@@ -62,7 +60,12 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
MasterName: ps.cfg.MasterName,
})
- ps.fanin = newFanIn(ps.universalClient, log)
+ statusCmd := ps.universalClient.Ping(context.Background())
+ if statusCmd.Err() != nil {
+ return nil, statusCmd.Err()
+ }
+
+ ps.channel = newRedisChannel(ps.universalClient, log)
ps.stop()
@@ -72,47 +75,32 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
func (p *PubSubDriver) stop() {
go func() {
for range p.stopCh {
- _ = p.fanin.stop()
+ _ = p.channel.stop()
return
}
}()
}
-func (p *PubSubDriver) Publish(msg []byte) error {
+func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
p.Lock()
defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return errors.E(err)
+ f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
+ if f.Err() != nil {
+ return f.Err()
}
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- return f.Err()
- }
- }
return nil
}
-func (p *PubSubDriver) PublishAsync(msg []byte) {
+func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
go func() {
p.Lock()
defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- p.log.Error("message unmarshal error")
- return
- }
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- p.log.Error("redis publish", "error", f.Err())
- }
+ f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
+ if f.Err() != nil {
+ p.log.Error("redis publish", "error", f.Err())
}
}()
}
@@ -128,13 +116,13 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
return err
}
if res == 0 {
- p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i])
+ p.log.Warn("could not subscribe to the provided topic, you might be already subscribed to it", "connectionID", connectionID, "topic", topics[i])
continue
}
}
// and subscribe after
- return p.fanin.sub(topics...)
+ return p.channel.sub(topics...)
}
func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
@@ -160,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error
}
// else - unsubscribe
- err = p.fanin.unsub(topics[i])
+ err = p.channel.unsub(topics[i])
if err != nil {
return err
}
@@ -176,7 +164,7 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
panic(err)
}
- // assighn connections
+ // assign connections
// res expected to be from the sync.Pool
for k := range r {
res[k] = struct{}{}
@@ -184,6 +172,6 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
- return <-p.fanin.consume(), nil
+func (p *PubSubDriver) Next() (*pubsub.Message, error) {
+ return p.channel.message(), nil
}