summaryrefslogtreecommitdiff
path: root/plugins/redis
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-27 00:09:33 +0300
committerValery Piashchynski <[email protected]>2021-05-27 00:09:33 +0300
commitdc3c5455e5c9b32737a0620c8bdb8bda0226dba7 (patch)
tree6ba562da6de7f32a8d528b72cbb56a8bc98c1b30 /plugins/redis
parentd2e9d8320857f5768c54843a43ad16f59d6a3e8f (diff)
- Update all main abstractions
- Desighn a new interfaces responsible for the whole PubSub - New plugin - websockets Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis')
-rw-r--r--plugins/redis/fanin.go100
-rw-r--r--plugins/redis/plugin.go135
2 files changed, 200 insertions, 35 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
new file mode 100644
index 00000000..29016720
--- /dev/null
+++ b/plugins/redis/fanin.go
@@ -0,0 +1,100 @@
+package redis
+
+import (
+ "context"
+ "sync"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type FanIn struct {
+ sync.Mutex
+
+ client redis.UniversalClient
+ pubsub *redis.PubSub
+
+ log logger.Logger
+
+ // out channel with all subs
+ out chan pubsub.Message
+
+ exit chan struct{}
+}
+
+func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
+ out := make(chan pubsub.Message, 100)
+ fi := &FanIn{
+ out: out,
+ client: redisClient,
+ pubsub: redisClient.Subscribe(context.Background()),
+ exit: make(chan struct{}),
+ log: log,
+ }
+
+ // start reading messages
+ go fi.read()
+
+ return fi
+}
+
+func (fi *FanIn) AddChannel(topics ...string) error {
+ const op = errors.Op("fanin_addchannel")
+ err := fi.pubsub.Subscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// read reads messages from the pubsub subscription
+func (fi *FanIn) read() {
+ for {
+ select {
+ //here we receive message from us (which we sent before in Publish)
+ //it should be compatible with the websockets.Msg interface
+ //payload should be in the redis.message.payload field
+
+ case msg, ok := <-fi.pubsub.Channel():
+ // channel closed
+ if !ok {
+ return
+ }
+ m := &pubsub.Msg{}
+ err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
+ if err != nil {
+ fi.log.Error("failed to unmarshal payload", "error", err.Error())
+ continue
+ }
+
+ fi.out <- m
+ case <-fi.exit:
+ return
+ }
+ }
+}
+
+func (fi *FanIn) RemoveChannel(topics ...string) error {
+ const op = errors.Op("fanin_remove")
+ err := fi.pubsub.Unsubscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (fi *FanIn) Stop() error {
+ fi.exit <- struct{}{}
+ close(fi.out)
+ close(fi.exit)
+ return nil
+}
+
+func (fi *FanIn) Consume() <-chan pubsub.Message {
+ return fi.out
+}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index f0011690..24ed1f92 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -1,8 +1,12 @@
package redis
import (
+ "context"
+ "sync"
+
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -10,72 +14,133 @@ import (
const PluginName = "redis"
type Plugin struct {
+ sync.Mutex
// config for RR integration
cfg *Config
// logger
log logger.Logger
// redis universal client
universalClient redis.UniversalClient
+
+ fanin *FanIn
}
-func (s *Plugin) GetClient() redis.UniversalClient {
- return s.universalClient
+func (p *Plugin) GetClient() redis.UniversalClient {
+ return p.universalClient
}
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("redis_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
- s.cfg.InitDefaults()
- s.log = log
-
- s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: s.cfg.Addrs,
- DB: s.cfg.DB,
- Username: s.cfg.Username,
- Password: s.cfg.Password,
- SentinelPassword: s.cfg.SentinelPassword,
- MaxRetries: s.cfg.MaxRetries,
- MinRetryBackoff: s.cfg.MaxRetryBackoff,
- MaxRetryBackoff: s.cfg.MaxRetryBackoff,
- DialTimeout: s.cfg.DialTimeout,
- ReadTimeout: s.cfg.ReadTimeout,
- WriteTimeout: s.cfg.WriteTimeout,
- PoolSize: s.cfg.PoolSize,
- MinIdleConns: s.cfg.MinIdleConns,
- MaxConnAge: s.cfg.MaxConnAge,
- PoolTimeout: s.cfg.PoolTimeout,
- IdleTimeout: s.cfg.IdleTimeout,
- IdleCheckFrequency: s.cfg.IdleCheckFreq,
- ReadOnly: s.cfg.ReadOnly,
- RouteByLatency: s.cfg.RouteByLatency,
- RouteRandomly: s.cfg.RouteRandomly,
- MasterName: s.cfg.MasterName,
+ p.cfg.InitDefaults()
+ p.log = log
+
+ p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: p.cfg.Addrs,
+ DB: p.cfg.DB,
+ Username: p.cfg.Username,
+ Password: p.cfg.Password,
+ SentinelPassword: p.cfg.SentinelPassword,
+ MaxRetries: p.cfg.MaxRetries,
+ MinRetryBackoff: p.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: p.cfg.MaxRetryBackoff,
+ DialTimeout: p.cfg.DialTimeout,
+ ReadTimeout: p.cfg.ReadTimeout,
+ WriteTimeout: p.cfg.WriteTimeout,
+ PoolSize: p.cfg.PoolSize,
+ MinIdleConns: p.cfg.MinIdleConns,
+ MaxConnAge: p.cfg.MaxConnAge,
+ PoolTimeout: p.cfg.PoolTimeout,
+ IdleTimeout: p.cfg.IdleTimeout,
+ IdleCheckFrequency: p.cfg.IdleCheckFreq,
+ ReadOnly: p.cfg.ReadOnly,
+ RouteByLatency: p.cfg.RouteByLatency,
+ RouteRandomly: p.cfg.RouteRandomly,
+ MasterName: p.cfg.MasterName,
})
+ // init fanin
+ p.fanin = NewFanIn(p.universalClient, log)
+
return nil
}
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
return errCh
}
-func (s Plugin) Stop() error {
- return s.universalClient.Close()
+func (p *Plugin) Stop() error {
+ const op = errors.Op("redis_plugin_stop")
+ err := p.fanin.Stop()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = p.universalClient.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
-func (s *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// Available interface implementation
-func (s *Plugin) Available() {}
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Publish(msg []pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ if f.Err() != nil {
+ return f.Err()
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error())
+ continue
+ }
+ }
+ }
+ }()
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ return p.fanin.AddChannel(topics...)
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ return p.fanin.RemoveChannel(topics...)
+}
+
+// Next return next message
+func (p *Plugin) Next() (pubsub.Message, error) {
+ return <-p.fanin.Consume(), nil
+}