summaryrefslogtreecommitdiff
path: root/plugins/redis/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis/plugin.go')
-rw-r--r--plugins/redis/plugin.go135
1 files changed, 100 insertions, 35 deletions
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index f0011690..24ed1f92 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -1,8 +1,12 @@
package redis
import (
+ "context"
+ "sync"
+
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -10,72 +14,133 @@ import (
const PluginName = "redis"
type Plugin struct {
+ sync.Mutex
// config for RR integration
cfg *Config
// logger
log logger.Logger
// redis universal client
universalClient redis.UniversalClient
+
+ fanin *FanIn
}
-func (s *Plugin) GetClient() redis.UniversalClient {
- return s.universalClient
+func (p *Plugin) GetClient() redis.UniversalClient {
+ return p.universalClient
}
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("redis_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
- s.cfg.InitDefaults()
- s.log = log
-
- s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: s.cfg.Addrs,
- DB: s.cfg.DB,
- Username: s.cfg.Username,
- Password: s.cfg.Password,
- SentinelPassword: s.cfg.SentinelPassword,
- MaxRetries: s.cfg.MaxRetries,
- MinRetryBackoff: s.cfg.MaxRetryBackoff,
- MaxRetryBackoff: s.cfg.MaxRetryBackoff,
- DialTimeout: s.cfg.DialTimeout,
- ReadTimeout: s.cfg.ReadTimeout,
- WriteTimeout: s.cfg.WriteTimeout,
- PoolSize: s.cfg.PoolSize,
- MinIdleConns: s.cfg.MinIdleConns,
- MaxConnAge: s.cfg.MaxConnAge,
- PoolTimeout: s.cfg.PoolTimeout,
- IdleTimeout: s.cfg.IdleTimeout,
- IdleCheckFrequency: s.cfg.IdleCheckFreq,
- ReadOnly: s.cfg.ReadOnly,
- RouteByLatency: s.cfg.RouteByLatency,
- RouteRandomly: s.cfg.RouteRandomly,
- MasterName: s.cfg.MasterName,
+ p.cfg.InitDefaults()
+ p.log = log
+
+ p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: p.cfg.Addrs,
+ DB: p.cfg.DB,
+ Username: p.cfg.Username,
+ Password: p.cfg.Password,
+ SentinelPassword: p.cfg.SentinelPassword,
+ MaxRetries: p.cfg.MaxRetries,
+ MinRetryBackoff: p.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: p.cfg.MaxRetryBackoff,
+ DialTimeout: p.cfg.DialTimeout,
+ ReadTimeout: p.cfg.ReadTimeout,
+ WriteTimeout: p.cfg.WriteTimeout,
+ PoolSize: p.cfg.PoolSize,
+ MinIdleConns: p.cfg.MinIdleConns,
+ MaxConnAge: p.cfg.MaxConnAge,
+ PoolTimeout: p.cfg.PoolTimeout,
+ IdleTimeout: p.cfg.IdleTimeout,
+ IdleCheckFrequency: p.cfg.IdleCheckFreq,
+ ReadOnly: p.cfg.ReadOnly,
+ RouteByLatency: p.cfg.RouteByLatency,
+ RouteRandomly: p.cfg.RouteRandomly,
+ MasterName: p.cfg.MasterName,
})
+ // init fanin
+ p.fanin = NewFanIn(p.universalClient, log)
+
return nil
}
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
return errCh
}
-func (s Plugin) Stop() error {
- return s.universalClient.Close()
+func (p *Plugin) Stop() error {
+ const op = errors.Op("redis_plugin_stop")
+ err := p.fanin.Stop()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = p.universalClient.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
-func (s *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// Available interface implementation
-func (s *Plugin) Available() {}
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Publish(msg []pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ if f.Err() != nil {
+ return f.Err()
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error())
+ continue
+ }
+ }
+ }
+ }()
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ return p.fanin.AddChannel(topics...)
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ return p.fanin.RemoveChannel(topics...)
+}
+
+// Next return next message
+func (p *Plugin) Next() (pubsub.Message, error) {
+ return <-p.fanin.Consume(), nil
+}