diff options
author | Valery Piashchynski <[email protected]> | 2021-05-27 00:09:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-27 00:09:33 +0300 |
commit | dc3c5455e5c9b32737a0620c8bdb8bda0226dba7 (patch) | |
tree | 6ba562da6de7f32a8d528b72cbb56a8bc98c1b30 /plugins/redis | |
parent | d2e9d8320857f5768c54843a43ad16f59d6a3e8f (diff) |
- Update all main abstractions
- Desighn a new interfaces responsible for the whole PubSub
- New plugin - websockets
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis')
-rw-r--r-- | plugins/redis/fanin.go | 100 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 135 |
2 files changed, 200 insertions, 35 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go new file mode 100644 index 00000000..29016720 --- /dev/null +++ b/plugins/redis/fanin.go @@ -0,0 +1,100 @@ +package redis + +import ( + "context" + "sync" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/utils" +) + +type FanIn struct { + sync.Mutex + + client redis.UniversalClient + pubsub *redis.PubSub + + log logger.Logger + + // out channel with all subs + out chan pubsub.Message + + exit chan struct{} +} + +func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { + out := make(chan pubsub.Message, 100) + fi := &FanIn{ + out: out, + client: redisClient, + pubsub: redisClient.Subscribe(context.Background()), + exit: make(chan struct{}), + log: log, + } + + // start reading messages + go fi.read() + + return fi +} + +func (fi *FanIn) AddChannel(topics ...string) error { + const op = errors.Op("fanin_addchannel") + err := fi.pubsub.Subscribe(context.Background(), topics...) + if err != nil { + return errors.E(op, err) + } + return nil +} + +// read reads messages from the pubsub subscription +func (fi *FanIn) read() { + for { + select { + //here we receive message from us (which we sent before in Publish) + //it should be compatible with the websockets.Msg interface + //payload should be in the redis.message.payload field + + case msg, ok := <-fi.pubsub.Channel(): + // channel closed + if !ok { + return + } + m := &pubsub.Msg{} + err := json.Unmarshal(utils.AsBytes(msg.Payload), m) + if err != nil { + fi.log.Error("failed to unmarshal payload", "error", err.Error()) + continue + } + + fi.out <- m + case <-fi.exit: + return + } + } +} + +func (fi *FanIn) RemoveChannel(topics ...string) error { + const op = errors.Op("fanin_remove") + err := fi.pubsub.Unsubscribe(context.Background(), topics...) + if err != nil { + return errors.E(op, err) + } + return nil +} + +func (fi *FanIn) Stop() error { + fi.exit <- struct{}{} + close(fi.out) + close(fi.exit) + return nil +} + +func (fi *FanIn) Consume() <-chan pubsub.Message { + return fi.out +} diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index f0011690..24ed1f92 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -1,8 +1,12 @@ package redis import ( + "context" + "sync" + "github.com/go-redis/redis/v8" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -10,72 +14,133 @@ import ( const PluginName = "redis" type Plugin struct { + sync.Mutex // config for RR integration cfg *Config // logger log logger.Logger // redis universal client universalClient redis.UniversalClient + + fanin *FanIn } -func (s *Plugin) GetClient() redis.UniversalClient { - return s.universalClient +func (p *Plugin) GetClient() redis.UniversalClient { + return p.universalClient } -func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("redis_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) + err := cfg.UnmarshalKey(PluginName, &p.cfg) if err != nil { return errors.E(op, errors.Disabled, err) } - s.cfg.InitDefaults() - s.log = log - - s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: s.cfg.Addrs, - DB: s.cfg.DB, - Username: s.cfg.Username, - Password: s.cfg.Password, - SentinelPassword: s.cfg.SentinelPassword, - MaxRetries: s.cfg.MaxRetries, - MinRetryBackoff: s.cfg.MaxRetryBackoff, - MaxRetryBackoff: s.cfg.MaxRetryBackoff, - DialTimeout: s.cfg.DialTimeout, - ReadTimeout: s.cfg.ReadTimeout, - WriteTimeout: s.cfg.WriteTimeout, - PoolSize: s.cfg.PoolSize, - MinIdleConns: s.cfg.MinIdleConns, - MaxConnAge: s.cfg.MaxConnAge, - PoolTimeout: s.cfg.PoolTimeout, - IdleTimeout: s.cfg.IdleTimeout, - IdleCheckFrequency: s.cfg.IdleCheckFreq, - ReadOnly: s.cfg.ReadOnly, - RouteByLatency: s.cfg.RouteByLatency, - RouteRandomly: s.cfg.RouteRandomly, - MasterName: s.cfg.MasterName, + p.cfg.InitDefaults() + p.log = log + + p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: p.cfg.Addrs, + DB: p.cfg.DB, + Username: p.cfg.Username, + Password: p.cfg.Password, + SentinelPassword: p.cfg.SentinelPassword, + MaxRetries: p.cfg.MaxRetries, + MinRetryBackoff: p.cfg.MaxRetryBackoff, + MaxRetryBackoff: p.cfg.MaxRetryBackoff, + DialTimeout: p.cfg.DialTimeout, + ReadTimeout: p.cfg.ReadTimeout, + WriteTimeout: p.cfg.WriteTimeout, + PoolSize: p.cfg.PoolSize, + MinIdleConns: p.cfg.MinIdleConns, + MaxConnAge: p.cfg.MaxConnAge, + PoolTimeout: p.cfg.PoolTimeout, + IdleTimeout: p.cfg.IdleTimeout, + IdleCheckFrequency: p.cfg.IdleCheckFreq, + ReadOnly: p.cfg.ReadOnly, + RouteByLatency: p.cfg.RouteByLatency, + RouteRandomly: p.cfg.RouteRandomly, + MasterName: p.cfg.MasterName, }) + // init fanin + p.fanin = NewFanIn(p.universalClient, log) + return nil } -func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) +func (p *Plugin) Serve() chan error { + errCh := make(chan error) return errCh } -func (s Plugin) Stop() error { - return s.universalClient.Close() +func (p *Plugin) Stop() error { + const op = errors.Op("redis_plugin_stop") + err := p.fanin.Stop() + if err != nil { + return errors.E(op, err) + } + + err = p.universalClient.Close() + if err != nil { + return errors.E(op, err) + } + + return nil } -func (s *Plugin) Name() string { +func (p *Plugin) Name() string { return PluginName } // Available interface implementation -func (s *Plugin) Available() {} +func (p *Plugin) Available() {} + +func (p *Plugin) Publish(msg []pubsub.Message) error { + p.Lock() + defer p.Unlock() + + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics()); j++ { + f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i]) + if f.Err() != nil { + return f.Err() + } + } + } + return nil +} + +func (p *Plugin) PublishAsync(msg []pubsub.Message) { + go func() { + p.Lock() + defer p.Unlock() + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics()); j++ { + f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i]) + if f.Err() != nil { + p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error()) + continue + } + } + } + }() +} + +func (p *Plugin) Subscribe(topics ...string) error { + return p.fanin.AddChannel(topics...) +} + +func (p *Plugin) Unsubscribe(topics ...string) error { + return p.fanin.RemoveChannel(topics...) +} + +// Next return next message +func (p *Plugin) Next() (pubsub.Message, error) { + return <-p.fanin.Consume(), nil +} |