diff options
author | Valery Piashchynski <[email protected]> | 2021-09-01 13:19:09 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-01 13:19:09 +0300 |
commit | 9c8da162b3347b632f33f85d56e8c1ff7014631a (patch) | |
tree | bb614a186e05adf816b3f2b62e2d25bfa821a574 | |
parent | 0437d1f58514f694ea86e8176e621c009cd510f9 (diff) |
Code polishing before release
Signed-off-by: Valery Piashchynski <[email protected]>
35 files changed, 185 insertions, 456 deletions
@@ -35,7 +35,6 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/headers.txt -covermode=atomic ./tests/plugins/headers go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/logger.txt -covermode=atomic ./tests/plugins/logger go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/metrics.txt -covermode=atomic ./tests/plugins/metrics - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/redis.txt -covermode=atomic ./tests/plugins/redis go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/resetter.txt -covermode=atomic ./tests/plugins/resetter go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./tests/plugins/rpc cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt @@ -50,10 +49,15 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/worker_watcher go test -v -race -tags=debug ./pkg/bst go test -v -race -tags=debug ./pkg/priority_queue - go test -v -race -tags=debug ./plugins/jobs/job go test -v -race -tags=debug ./plugins/jobs/pipeline go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./plugins/server + go test -v -race -tags=debug ./plugins/jobs/job + go test -v -race -tags=debug ./tests/plugins/jobs + go test -v -race -tags=debug ./tests/plugins/kv + go test -v -race -tags=debug ./tests/plugins/broadcast + go test -v -race -tags=debug ./tests/plugins/websockets + go test -v -race -tags=debug ./plugins/websockets go test -v -race -tags=debug ./tests/plugins/http go test -v -race -tags=debug ./tests/plugins/informer go test -v -race -tags=debug ./tests/plugins/reload @@ -65,12 +69,6 @@ test: ## Run application tests go test -v -race -tags=debug ./tests/plugins/headers go test -v -race -tags=debug ./tests/plugins/logger go test -v -race -tags=debug ./tests/plugins/metrics - go test -v -race -tags=debug ./tests/plugins/redis go test -v -race -tags=debug ./tests/plugins/resetter go test -v -race -tags=debug ./tests/plugins/rpc - go test -v -race -tags=debug ./tests/plugins/kv - go test -v -race -tags=debug ./tests/plugins/broadcast - go test -v -race -tags=debug ./tests/plugins/websockets - go test -v -race -tags=debug ./plugins/websockets - go test -v -race -tags=debug ./tests/plugins/jobs docker-compose -f tests/env/docker-compose.yaml down diff --git a/common/kv/interface.go b/common/kv/interface.go index 5736a6a7..bc6a07b2 100644 --- a/common/kv/interface.go +++ b/common/kv/interface.go @@ -30,6 +30,9 @@ type Storage interface { // Delete one or multiple keys. Delete(keys ...string) error + + // Stop the storage driver + Stop() } // Constructor provides storage based on the config diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index ed0eda61..46d596fa 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return &consumer{ file: pipeline.String(file, rrDB), priority: pipeline.Int(priority, 10), - prefetch: pipeline.Int(prefetch, 100), + prefetch: pipeline.Int(prefetch, 1000), permissions: conf.Permissions, bPool: sync.Pool{New: func() interface{} { diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 7c161555..081d3f57 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -3,6 +3,7 @@ package boltjobs import ( "bytes" "encoding/gob" + "sync/atomic" "time" "github.com/spiral/roadrunner/v2/utils" @@ -18,6 +19,10 @@ func (c *consumer) listener() { c.log.Info("boltdb listener stopped") return case <-tt.C: + if atomic.LoadUint64(c.active) > uint64(c.prefetch) { + time.Sleep(time.Second) + continue + } tx, err := c.db.Begin(true) if err != nil { c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go index ba1450cd..656d572e 100644 --- a/plugins/boltdb/boltkv/driver.go +++ b/plugins/boltdb/boltkv/driver.go @@ -38,7 +38,7 @@ type Driver struct { stop chan struct{} } -func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { +func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { const op = errors.Op("new_boltdb_driver") if !cfgPlugin.Has(RootPluginName) { @@ -47,7 +47,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, d := &Driver{ log: log, - stop: stop, + stop: make(chan struct{}), } err := cfgPlugin.UnmarshalKey(key, &d.cfg) @@ -411,6 +411,10 @@ func (d *Driver) Clear() error { return nil } +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + // ========================= PRIVATE ================================= func (d *Driver) startGCLoop() { //nolint:gocognit diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md index 317aec90..1424e586 100644 --- a/plugins/boltdb/doc/job_lifecycle.md +++ b/plugins/boltdb/doc/job_lifecycle.md @@ -7,4 +7,3 @@ There are several boltdb buckets: get into the `InQueueBucket` waiting to acknowledgement. 3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration. -`` diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go index 683b26f1..ad98cf3c 100644 --- a/plugins/boltdb/plugin.go +++ b/plugins/boltdb/plugin.go @@ -19,19 +19,14 @@ const ( // Plugin BoltDB K/V storage. type Plugin struct { - cfgPlugin config.Configurer + cfg config.Configurer // logger log logger.Logger - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - - drivers uint } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.stop = make(chan struct{}) p.log = log - p.cfgPlugin = cfg + p.cfg = cfg return nil } @@ -41,12 +36,6 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } @@ -60,23 +49,20 @@ func (p *Plugin) Available() {} func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg) if err != nil { return nil, errors.E(op, err) } - // save driver number to release resources after Stop - p.drivers++ - return st, nil } // JOBS bbolt implementation func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue) + return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue) } func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue) + return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue) } diff --git a/plugins/ephemeral/plugin.go b/plugins/ephemeral/plugin.go deleted file mode 100644 index 28495abb..00000000 --- a/plugins/ephemeral/plugin.go +++ /dev/null @@ -1,41 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "ephemeral" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Available() {} - -// JobsConstruct creates new ephemeral consumer from the configuration -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(configKey, p.log, p.cfg, e, pq) -} - -// FromPipeline creates new ephemeral consumer from the provided pipeline -func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipeline, p.log, e, pq) -} diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index c6ca96c3..a1144b85 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -104,6 +104,10 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { + // stop all attached storages + for k := range p.storages { + p.storages[k].Stop() + } return nil } diff --git a/plugins/memcached/config.go b/plugins/memcached/memcachedkv/config.go index 6d413790..569e2573 100644 --- a/plugins/memcached/config.go +++ b/plugins/memcached/memcachedkv/config.go @@ -1,4 +1,4 @@ -package memcached +package memcachedkv type Config struct { // Addr is url for memcached, 11211 port is used by default diff --git a/plugins/memcached/driver.go b/plugins/memcached/memcachedkv/driver.go index e24747fe..6d5e1802 100644 --- a/plugins/memcached/driver.go +++ b/plugins/memcached/memcachedkv/driver.go @@ -1,4 +1,4 @@ -package memcached +package memcachedkv import ( "strings" @@ -246,3 +246,5 @@ func (d *Driver) Clear() error { return nil } + +func (d *Driver) Stop() {} diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go index 59a2b7cb..47bca0e2 100644 --- a/plugins/memcached/plugin.go +++ b/plugins/memcached/plugin.go @@ -5,6 +5,7 @@ import ( "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memcached/memcachedkv" ) const ( @@ -39,7 +40,7 @@ func (s *Plugin) Available() {} func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) + st, err := memcachedkv.NewMemcachedDriver(s.log, key, s.cfgPlugin) if err != nil { return nil, errors.E(op, err) } diff --git a/plugins/ephemeral/consumer.go b/plugins/memory/memoryjobs/consumer.go index 8870bb0f..94abaadb 100644 --- a/plugins/ephemeral/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -1,4 +1,4 @@ -package ephemeral +package memoryjobs import ( "context" diff --git a/plugins/ephemeral/item.go b/plugins/memory/memoryjobs/item.go index 3298424d..8224c26b 100644 --- a/plugins/ephemeral/item.go +++ b/plugins/memory/memoryjobs/item.go @@ -1,4 +1,4 @@ -package ephemeral +package memoryjobs import ( "context" diff --git a/plugins/memory/config.go b/plugins/memory/memorykv/config.go index e51d09c5..a8a8993f 100644 --- a/plugins/memory/config.go +++ b/plugins/memory/memorykv/config.go @@ -1,4 +1,4 @@ -package memory +package memorykv // Config is default config for the in-memory driver type Config struct { diff --git a/plugins/memory/kv.go b/plugins/memory/memorykv/kv.go index 68ea7266..9b3e176c 100644 --- a/plugins/memory/kv.go +++ b/plugins/memory/memorykv/kv.go @@ -1,4 +1,4 @@ -package memory +package memorykv import ( "strings" @@ -20,11 +20,11 @@ type Driver struct { cfg *Config } -func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { +func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) { const op = errors.Op("new_in_memory_driver") d := &Driver{ - stop: stop, + stop: make(chan struct{}), log: log, } @@ -40,7 +40,7 @@ func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configure return d, nil } -func (s *Driver) Has(keys ...string) (map[string]bool, error) { +func (d *Driver) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("in_memory_plugin_has") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -52,7 +52,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) { return nil, errors.E(op, errors.EmptyKey) } - if _, ok := s.heap.Load(keys[i]); ok { + if _, ok := d.heap.Load(keys[i]); ok { m[keys[i]] = true } } @@ -60,7 +60,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) { return m, nil } -func (s *Driver) Get(key string) ([]byte, error) { +func (d *Driver) Get(key string) ([]byte, error) { const op = errors.Op("in_memory_plugin_get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -68,7 +68,7 @@ func (s *Driver) Get(key string) ([]byte, error) { return nil, errors.E(op, errors.EmptyKey) } - if data, exist := s.heap.Load(key); exist { + if data, exist := d.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function return data.(*kvv1.Item).Value, nil @@ -76,7 +76,7 @@ func (s *Driver) Get(key string) ([]byte, error) { return nil, nil } -func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("in_memory_plugin_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -93,7 +93,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { m := make(map[string][]byte, len(keys)) for i := range keys { - if value, ok := s.heap.Load(keys[i]); ok { + if value, ok := d.heap.Load(keys[i]); ok { m[keys[i]] = value.(*kvv1.Item).Value } } @@ -101,7 +101,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { return m, nil } -func (s *Driver) Set(items ...*kvv1.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -120,14 +120,14 @@ func (s *Driver) Set(items ...*kvv1.Item) error { } } - s.heap.Store(items[i].Key, items[i]) + d.heap.Store(items[i].Key, items[i]) } return nil } // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...*kvv1.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_mexpire") for i := range items { if items[i] == nil { @@ -138,7 +138,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { } // if key exist, overwrite it value - if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok { + if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok { // check that time is correct _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { @@ -148,7 +148,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { // guess that t is in the future // in memory is just FOR TESTING PURPOSES // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, &kvv1.Item{ + d.heap.Store(items[i].Key, &kvv1.Item{ Key: items[i].Key, Value: tmp.Value, Timeout: items[i].Timeout, @@ -159,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { return nil } -func (s *Driver) TTL(keys ...string) (map[string]string, error) { +func (d *Driver) TTL(keys ...string) (map[string]string, error) { const op = errors.Op("in_memory_plugin_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -176,14 +176,14 @@ func (s *Driver) TTL(keys ...string) (map[string]string, error) { m := make(map[string]string, len(keys)) for i := range keys { - if item, ok := s.heap.Load(keys[i]); ok { + if item, ok := d.heap.Load(keys[i]); ok { m[keys[i]] = item.(*kvv1.Item).Timeout } } return m, nil } -func (s *Driver) Delete(keys ...string) error { +func (d *Driver) Delete(keys ...string) error { const op = errors.Op("in_memory_plugin_delete") if keys == nil { return errors.E(op, errors.NoKeys) @@ -198,34 +198,38 @@ func (s *Driver) Delete(keys ...string) error { } for i := range keys { - s.heap.Delete(keys[i]) + d.heap.Delete(keys[i]) } return nil } -func (s *Driver) Clear() error { - s.clearMu.Lock() - s.heap = sync.Map{} - s.clearMu.Unlock() +func (d *Driver) Clear() error { + d.clearMu.Lock() + d.heap = sync.Map{} + d.clearMu.Unlock() return nil } +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + // ================================== PRIVATE ====================================== -func (s *Driver) gc() { - ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) +func (d *Driver) gc() { + ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second) + defer ticker.Stop() for { select { - case <-s.stop: - ticker.Stop() + case <-d.stop: return case now := <-ticker.C: // mutes needed to clear the map - s.clearMu.RLock() + d.clearMu.RLock() // check every second - s.heap.Range(func(key, value interface{}) bool { + d.heap.Range(func(key, value interface{}) bool { v := value.(*kvv1.Item) if v.Timeout == "" { return true @@ -237,13 +241,13 @@ func (s *Driver) gc() { } if now.After(t) { - s.log.Debug("key deleted", "key", key) - s.heap.Delete(key) + d.log.Debug("key deleted", "key", key) + d.heap.Delete(key) } return true }) - s.clearMu.RUnlock() + d.clearMu.RUnlock() } } } diff --git a/plugins/memory/pubsub.go b/plugins/memory/memorypubsub/pubsub.go index fd30eb54..75122571 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/memorypubsub/pubsub.go @@ -1,4 +1,4 @@ -package memory +package memorypubsub import ( "context" diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 7d418a70..515e469a 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,27 +2,29 @@ package memory import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs" + "github.com/spiral/roadrunner/v2/plugins/memory/memorykv" + "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub" ) const PluginName string = "memory" type Plugin struct { - // heap is user map for the key-value pairs - stop chan struct{} - - log logger.Logger - cfgPlugin config.Configurer - drivers uint + log logger.Logger + cfg config.Configurer } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.cfgPlugin = cfg - p.stop = make(chan struct{}, 1) + p.cfg = cfg return nil } @@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} + +// Drivers implementation + func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { - return NewPubSubDriver(p.log, key) + return memorypubsub.NewPubSubDriver(p.log, key) } func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") - st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg) if err != nil { return nil, errors.E(op, err) } - - // save driver number to release resources after Stop - p.drivers++ - return st, nil } -func (p *Plugin) Name() string { - return PluginName +// JobsConstruct creates new ephemeral consumer from the configuration +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline creates new ephemeral consumer from the provided pipeline +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.FromPipeline(pipeline, p.log, e, pq) } diff --git a/plugins/redis/clients.go b/plugins/redis/clients.go deleted file mode 100644 index d0a184d2..00000000 --- a/plugins/redis/clients.go +++ /dev/null @@ -1,84 +0,0 @@ -package redis - -import ( - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" -) - -// RedisClient return a client based on the provided section key -// key sample: kv.some-section.redis -// kv.redis -// redis (root) -func (p *Plugin) RedisClient(key string) (redis.UniversalClient, error) { - const op = errors.Op("redis_get_client") - - if !p.cfgPlugin.Has(key) { - return nil, errors.E(op, errors.Errorf("no such section: %s", key)) - } - - cfg := &Config{} - - err := p.cfgPlugin.UnmarshalKey(key, cfg) - if err != nil { - return nil, errors.E(op, err) - } - - cfg.InitDefaults() - - uc := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: cfg.Addrs, - DB: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - SentinelPassword: cfg.SentinelPassword, - MaxRetries: cfg.MaxRetries, - MinRetryBackoff: cfg.MaxRetryBackoff, - MaxRetryBackoff: cfg.MaxRetryBackoff, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolSize: cfg.PoolSize, - MinIdleConns: cfg.MinIdleConns, - MaxConnAge: cfg.MaxConnAge, - PoolTimeout: cfg.PoolTimeout, - IdleTimeout: cfg.IdleTimeout, - IdleCheckFrequency: cfg.IdleCheckFreq, - ReadOnly: cfg.ReadOnly, - RouteByLatency: cfg.RouteByLatency, - RouteRandomly: cfg.RouteRandomly, - MasterName: cfg.MasterName, - }) - - return uc, nil -} - -func (p *Plugin) DefaultClient() redis.UniversalClient { - cfg := &Config{} - cfg.InitDefaults() - - uc := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: cfg.Addrs, - DB: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - SentinelPassword: cfg.SentinelPassword, - MaxRetries: cfg.MaxRetries, - MinRetryBackoff: cfg.MaxRetryBackoff, - MaxRetryBackoff: cfg.MaxRetryBackoff, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolSize: cfg.PoolSize, - MinIdleConns: cfg.MinIdleConns, - MaxConnAge: cfg.MaxConnAge, - PoolTimeout: cfg.PoolTimeout, - IdleTimeout: cfg.IdleTimeout, - IdleCheckFrequency: cfg.IdleCheckFreq, - ReadOnly: cfg.ReadOnly, - RouteByLatency: cfg.RouteByLatency, - RouteRandomly: cfg.RouteRandomly, - MasterName: cfg.MasterName, - }) - - return uc -} diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go deleted file mode 100644 index 189b0002..00000000 --- a/plugins/redis/interface.go +++ /dev/null @@ -1,12 +0,0 @@ -package redis - -import "github.com/go-redis/redis/v8" - -// Redis in the redis KV plugin interface -type Redis interface { - // RedisClient provides universal redis client - RedisClient(key string) (redis.UniversalClient, error) - - // DefaultClient provide default redis client based on redis defaults - DefaultClient() redis.UniversalClient -} diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go index b41cb86c..3d062fbb 100644 --- a/plugins/redis/kv/kv.go +++ b/plugins/redis/kv/kv.go @@ -248,3 +248,5 @@ func (d *Driver) Clear() error { return nil } + +func (d *Driver) Stop() {} diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index bf9f60cc..9813344e 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -45,17 +45,17 @@ jobs: # list of broker pipelines associated with endpoints pipelines: test-local: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 test-local-2: - driver: ephemeral + driver: memory priority: 1 prefetch: 10000 test-local-3: - driver: ephemeral + driver: memory priority: 2 prefetch: 10000 diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 951d6227..5c521c2b 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -14,9 +14,9 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -68,7 +68,7 @@ func TestJobsInit(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, &amqp.Plugin{}, ) assert.NoError(t, err) @@ -154,7 +154,7 @@ func TestJOBSMetrics(t *testing.T) { &server.Plugin{}, &jobs.Plugin{}, &metrics.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, mockLogger, ) assert.NoError(t, err) @@ -204,8 +204,8 @@ func TestJOBSMetrics(t *testing.T) { time.Sleep(time.Second * 2) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("DeclareEphemeralPipeline", declareMemoryPipe) + t.Run("ConsumeEphemeralPipeline", consumeMemoryPipe) t.Run("PushEphemeralPipeline", pushToPipe("test-3")) time.Sleep(time.Second) t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_memory_test.go index 2890aa9d..20cbfb3f 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_memory_test.go @@ -15,9 +15,9 @@ import ( goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -26,12 +26,12 @@ import ( "github.com/stretchr/testify/assert" ) -func TestEphemeralInit(t *testing.T) { +func TestMemoryInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-init.yaml", + Path: "memory/.rr-memory-init.yaml", Prefix: "rr", } @@ -58,7 +58,7 @@ func TestEphemeralInit(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -112,12 +112,12 @@ func TestEphemeralInit(t *testing.T) { wg.Wait() } -func TestEphemeralDeclare(t *testing.T) { +func TestMemoryDeclare(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-declare.yaml", + Path: "memory/.rr-memory-declare.yaml", Prefix: "rr", } @@ -135,7 +135,7 @@ func TestEphemeralDeclare(t *testing.T) { mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) err = cont.RegisterAll( @@ -146,7 +146,7 @@ func TestEphemeralDeclare(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -197,25 +197,25 @@ func TestEphemeralDeclare(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) stopCh <- struct{}{} wg.Wait() } -func TestEphemeralPauseResume(t *testing.T) { +func TestMemoryPauseResume(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-pause-resume.yaml", + Path: "memory/.rr-memory-pause-resume.yaml", Prefix: "rr", } @@ -231,7 +231,7 @@ func TestEphemeralPauseResume(t *testing.T) { mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) @@ -249,7 +249,7 @@ func TestEphemeralPauseResume(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -301,10 +301,10 @@ func TestEphemeralPauseResume(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("ephemeralResume", resumePipes("test-local")) - t.Run("ephemeralPause", pausePipelines("test-local")) + t.Run("Resume", resumePipes("test-local")) + t.Run("Pause", pausePipelines("test-local")) t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) - t.Run("ephemeralResume", resumePipes("test-local")) + t.Run("Resume", resumePipes("test-local")) t.Run("pushToEnabledPipe", pushToPipe("test-local")) time.Sleep(time.Second * 1) @@ -313,12 +313,12 @@ func TestEphemeralPauseResume(t *testing.T) { wg.Wait() } -func TestEphemeralJobsError(t *testing.T) { +func TestMemoryJobsError(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-jobs-err.yaml", + Path: "memory/.rr-memory-jobs-err.yaml", Prefix: "rr", } @@ -336,7 +336,7 @@ func TestEphemeralJobsError(t *testing.T) { mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -348,7 +348,7 @@ func TestEphemeralJobsError(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -399,25 +399,25 @@ func TestEphemeralJobsError(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", resumePipes("test-3")) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second * 25) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) stopCh <- struct{}{} wg.Wait() } -func TestEphemeralStats(t *testing.T) { +func TestMemoryStats(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-declare.yaml", + Path: "memory/.rr-memory-declare.yaml", Prefix: "rr", } @@ -435,7 +435,7 @@ func TestEphemeralStats(t *testing.T) { mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) err = cont.RegisterAll( @@ -446,7 +446,7 @@ func TestEphemeralStats(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -497,22 +497,22 @@ func TestEphemeralStats(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("PushPipeline", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) out := &jobState.State{} t.Run("Stats", stats(out)) assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Driver, "memory") assert.Equal(t, out.Queue, "test-3") assert.Equal(t, out.Active, int64(1)) @@ -520,14 +520,14 @@ func TestEphemeralStats(t *testing.T) { assert.Equal(t, out.Reserved, int64(0)) time.Sleep(time.Second) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) time.Sleep(time.Second * 7) out = &jobState.State{} t.Run("Stats", stats(out)) assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Driver, "memory") assert.Equal(t, out.Queue, "test-3") assert.Equal(t, out.Active, int64(0)) @@ -541,13 +541,13 @@ func TestEphemeralStats(t *testing.T) { wg.Wait() } -func declareEphemeralPipe(t *testing.T) { +func declareMemoryPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "ephemeral", + "driver": "memory", "name": "test-3", "prefetch": "10000", }} @@ -557,7 +557,7 @@ func declareEphemeralPipe(t *testing.T) { assert.NoError(t, err) } -func consumeEphemeralPipe(t *testing.T) { +func consumeMemoryPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml b/tests/plugins/jobs/memory/.rr-memory-declare.yaml index 726c24ac..726c24ac 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-declare.yaml diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml b/tests/plugins/jobs/memory/.rr-memory-init.yaml index 8914dfaa..9ee8afc2 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-init.yaml @@ -22,12 +22,12 @@ jobs: pipelines: test-1: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 test-2: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml index 05dc3ffa..05dc3ffa 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml index e1b76263..1ad48237 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml @@ -25,17 +25,17 @@ jobs: # list of broker pipelines associated with endpoints pipelines: test-local: - driver: ephemeral + driver: memory priority: 10 pipeline_size: 10000 test-local-2: - driver: ephemeral + driver: memory priority: 1 pipeline_size: 10000 test-local-3: - driver: ephemeral + driver: memory priority: 2 pipeline_size: 10000 diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml index f58de3e4..471e5c77 100644 --- a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml +++ b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: boltdb-south: diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml index 08b3bfad..b46bcb1c 100644 --- a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml +++ b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: boltdb-south: diff --git a/tests/plugins/kv/configs/.rr-kv-init.yaml b/tests/plugins/kv/configs/.rr-kv-init.yaml index a13b591c..6407c7ad 100644 --- a/tests/plugins/kv/configs/.rr-kv-init.yaml +++ b/tests/plugins/kv/configs/.rr-kv-init.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: default: @@ -25,6 +28,3 @@ kv: memcached: driver: memcached addr: [ "127.0.0.1:11211" ] - -# redis: -# driver: redis diff --git a/tests/plugins/redis/plugin1.go b/tests/plugins/redis/plugin1.go deleted file mode 100644 index 68da1394..00000000 --- a/tests/plugins/redis/plugin1.go +++ /dev/null @@ -1,45 +0,0 @@ -package redis - -import ( - "context" - "time" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - redisPlugin "github.com/spiral/roadrunner/v2/plugins/redis" -) - -type Plugin1 struct { - redisClient redis.UniversalClient -} - -func (p *Plugin1) Init(redis redisPlugin.Redis) error { - var err error - p.redisClient, err = redis.RedisClient("redis") - - return err -} - -func (p *Plugin1) Serve() chan error { - const op = errors.Op("plugin1 serve") - errCh := make(chan error, 1) - p.redisClient.Set(context.Background(), "foo", "bar", time.Minute) - - stringCmd := p.redisClient.Get(context.Background(), "foo") - data, err := stringCmd.Result() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - if data != "bar" { - errCh <- errors.E(op, errors.Str("no such key")) - return errCh - } - - return errCh -} - -func (p *Plugin1) Stop() error { - return nil -} diff --git a/tests/plugins/redis/redis_plugin_test.go b/tests/plugins/redis/redis_plugin_test.go deleted file mode 100644 index 1b84e339..00000000 --- a/tests/plugins/redis/redis_plugin_test.go +++ /dev/null @@ -1,120 +0,0 @@ -package redis - -import ( - "fmt" - "os" - "os/signal" - "sync" - "syscall" - "testing" - - "github.com/alicebob/miniredis/v2" - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/redis" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" -) - -func redisConfig(port string) string { - cfg := ` -redis: - addrs: - - '127.0.0.1:%s' - master_name: '' - username: '' - password: '' - db: 0 - sentinel_password: '' - route_by_latency: false - route_randomly: false - dial_timeout: 0 - max_retries: 1 - min_retry_backoff: 0 - max_retry_backoff: 0 - pool_size: 0 - min_idle_conns: 0 - max_conn_age: 0 - read_timeout: 0 - write_timeout: 0 - pool_timeout: 0 - idle_timeout: 0 - idle_check_freq: 0 - read_only: false -` - return fmt.Sprintf(cfg, port) -} - -func TestRedisInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - if err != nil { - t.Fatal(err) - } - - s, err := miniredis.Run() - assert.NoError(t, err) - - c := redisConfig(s.Port()) - - cfg := &config.Viper{} - cfg.Type = "yaml" - cfg.ReadInCfg = []byte(c) - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - err = cont.RegisterAll( - cfg, - mockLogger, - &redis.Plugin{}, - &Plugin1{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - stopCh <- struct{}{} - wg.Wait() -} diff --git a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml index 5ab359d3..d256aad7 100644 --- a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml +++ b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml @@ -1,3 +1,8 @@ logs: mode: development - level: error
\ No newline at end of file + level: panic + +endure: + grace_period: 120s + print_graph: false + log_level: panic diff --git a/tests/plugins/rpc/configs/.rr.yaml b/tests/plugins/rpc/configs/.rr.yaml index 67d935e3..d6aaa7c6 100644 --- a/tests/plugins/rpc/configs/.rr.yaml +++ b/tests/plugins/rpc/configs/.rr.yaml @@ -1,5 +1,11 @@ rpc: listen: tcp://127.0.0.1:6001 + logs: mode: development - level: error
\ No newline at end of file + level: panic + +endure: + grace_period: 120s + print_graph: false + log_level: panic |