diff options
Diffstat (limited to 'plugins/redis')
-rw-r--r-- | plugins/redis/channel.go | 97 | ||||
-rw-r--r-- | plugins/redis/fanin.go | 102 | ||||
-rw-r--r-- | plugins/redis/kv.go | 2 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 6 | ||||
-rw-r--r-- | plugins/redis/pubsub.go | 56 |
5 files changed, 123 insertions, 140 deletions
diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go new file mode 100644 index 00000000..5817853c --- /dev/null +++ b/plugins/redis/channel.go @@ -0,0 +1,97 @@ +package redis + +import ( + "context" + "sync" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +type redisChannel struct { + sync.Mutex + + // redis client + client redis.UniversalClient + pubsub *redis.PubSub + + log logger.Logger + + // out channel with all subs + out chan *pubsub.Message + + exit chan struct{} +} + +func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel { + out := make(chan *pubsub.Message, 100) + fi := &redisChannel{ + out: out, + client: redisClient, + pubsub: redisClient.Subscribe(context.Background()), + exit: make(chan struct{}), + log: log, + } + + // start reading messages + go fi.read() + + return fi +} + +func (r *redisChannel) sub(topics ...string) error { + const op = errors.Op("redis_sub") + err := r.pubsub.Subscribe(context.Background(), topics...) + if err != nil { + return errors.E(op, err) + } + return nil +} + +// read reads messages from the pubsub subscription +func (r *redisChannel) read() { + for { + select { + // here we receive message from us (which we sent before in Publish) + // it should be compatible with the pubsub.Message structure + // payload should be in the redis.message.payload field + + case msg, ok := <-r.pubsub.Channel(): + // channel closed + if !ok { + return + } + + r.out <- &pubsub.Message{ + Topic: msg.Channel, + Payload: utils.AsBytes(msg.Payload), + } + + case <-r.exit: + return + } + } +} + +func (r *redisChannel) unsub(topic string) error { + const op = errors.Op("redis_unsub") + err := r.pubsub.Unsubscribe(context.Background(), topic) + if err != nil { + return errors.E(op, err) + } + return nil +} + +func (r *redisChannel) stop() error { + r.exit <- struct{}{} + close(r.out) + close(r.exit) + return nil +} + +func (r *redisChannel) message() *pubsub.Message { + return <-r.out +} diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go deleted file mode 100644 index ac9ebcc2..00000000 --- a/plugins/redis/fanin.go +++ /dev/null @@ -1,102 +0,0 @@ -package redis - -import ( - "context" - "sync" - - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/plugins/logger" - "google.golang.org/protobuf/proto" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/utils" -) - -type FanIn struct { - sync.Mutex - - // redis client - client redis.UniversalClient - pubsub *redis.PubSub - - log logger.Logger - - // out channel with all subs - out chan *websocketsv1.Message - - exit chan struct{} -} - -func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *websocketsv1.Message, 100) - fi := &FanIn{ - out: out, - client: redisClient, - pubsub: redisClient.Subscribe(context.Background()), - exit: make(chan struct{}), - log: log, - } - - // start reading messages - go fi.read() - - return fi -} - -func (fi *FanIn) sub(topics ...string) error { - const op = errors.Op("fanin_addchannel") - err := fi.pubsub.Subscribe(context.Background(), topics...) - if err != nil { - return errors.E(op, err) - } - return nil -} - -// read reads messages from the pubsub subscription -func (fi *FanIn) read() { - for { - select { - // here we receive message from us (which we sent before in Publish) - // it should be compatible with the websockets.Msg interface - // payload should be in the redis.message.payload field - - case msg, ok := <-fi.pubsub.Channel(): - // channel closed - if !ok { - return - } - - m := &websocketsv1.Message{} - err := proto.Unmarshal(utils.AsBytes(msg.Payload), m) - if err != nil { - fi.log.Error("message unmarshal") - continue - } - - fi.out <- m - case <-fi.exit: - return - } - } -} - -func (fi *FanIn) unsub(topic string) error { - const op = errors.Op("fanin_remove") - err := fi.pubsub.Unsubscribe(context.Background(), topic) - if err != nil { - return errors.E(op, err) - } - return nil -} - -func (fi *FanIn) stop() error { - fi.exit <- struct{}{} - close(fi.out) - close(fi.exit) - return nil -} - -func (fi *FanIn) consume() <-chan *websocketsv1.Message { - return fi.out -} diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go index 66cb8384..320b7443 100644 --- a/plugins/redis/kv.go +++ b/plugins/redis/kv.go @@ -7,10 +7,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" ) diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 24c21b55..9d98790b 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -59,8 +59,8 @@ func (p *Plugin) Name() string { // Available interface implementation func (p *Plugin) Available() {} -// KVProvide provides KV storage implementation over the redis plugin -func (p *Plugin) KVProvide(key string) (kv.Storage, error) { +// KVConstruct provides KV storage implementation over the redis plugin +func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("redis_plugin_provide") st, err := NewRedisDriver(p.log, key, p.cfgPlugin) if err != nil { @@ -70,6 +70,6 @@ func (p *Plugin) KVProvide(key string) (kv.Storage, error) { return st, nil } -func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) { +func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh) } diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index dbda7ea4..4e41acb5 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -6,11 +6,9 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" - "google.golang.org/protobuf/proto" ) type PubSubDriver struct { @@ -18,7 +16,7 @@ type PubSubDriver struct { cfg *Config `mapstructure:"redis"` log logger.Logger - fanin *FanIn + channel *redisChannel universalClient redis.UniversalClient stopCh chan struct{} } @@ -62,7 +60,12 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, MasterName: ps.cfg.MasterName, }) - ps.fanin = newFanIn(ps.universalClient, log) + statusCmd := ps.universalClient.Ping(context.Background()) + if statusCmd.Err() != nil { + return nil, statusCmd.Err() + } + + ps.channel = newRedisChannel(ps.universalClient, log) ps.stop() @@ -72,47 +75,32 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, func (p *PubSubDriver) stop() { go func() { for range p.stopCh { - _ = p.fanin.stop() + _ = p.channel.stop() return } }() } -func (p *PubSubDriver) Publish(msg []byte) error { +func (p *PubSubDriver) Publish(msg *pubsub.Message) error { p.Lock() defer p.Unlock() - m := &websocketsv1.Message{} - err := proto.Unmarshal(msg, m) - if err != nil { - return errors.E(err) + f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload) + if f.Err() != nil { + return f.Err() } - for j := 0; j < len(m.GetTopics()); j++ { - f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) - if f.Err() != nil { - return f.Err() - } - } return nil } -func (p *PubSubDriver) PublishAsync(msg []byte) { +func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { go func() { p.Lock() defer p.Unlock() - m := &websocketsv1.Message{} - err := proto.Unmarshal(msg, m) - if err != nil { - p.log.Error("message unmarshal error") - return - } - for j := 0; j < len(m.GetTopics()); j++ { - f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) - if f.Err() != nil { - p.log.Error("redis publish", "error", f.Err()) - } + f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload) + if f.Err() != nil { + p.log.Error("redis publish", "error", f.Err()) } }() } @@ -128,13 +116,13 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { return err } if res == 0 { - p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i]) + p.log.Warn("could not subscribe to the provided topic, you might be already subscribed to it", "connectionID", connectionID, "topic", topics[i]) continue } } // and subscribe after - return p.fanin.sub(topics...) + return p.channel.sub(topics...) } func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { @@ -160,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error } // else - unsubscribe - err = p.fanin.unsub(topics[i]) + err = p.channel.unsub(topics[i]) if err != nil { return err } @@ -176,7 +164,7 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { panic(err) } - // assighn connections + // assign connections // res expected to be from the sync.Pool for k := range r { res[k] = struct{}{} @@ -184,6 +172,6 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *PubSubDriver) Next() (*websocketsv1.Message, error) { - return <-p.fanin.consume(), nil +func (p *PubSubDriver) Next() (*pubsub.Message, error) { + return p.channel.message(), nil } |