summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-01 13:19:09 +0300
committerValery Piashchynski <[email protected]>2021-09-01 13:19:09 +0300
commit9c8da162b3347b632f33f85d56e8c1ff7014631a (patch)
treebb614a186e05adf816b3f2b62e2d25bfa821a574
parent0437d1f58514f694ea86e8176e621c009cd510f9 (diff)
Code polishing before release
Signed-off-by: Valery Piashchynski <[email protected]>
-rwxr-xr-xMakefile14
-rw-r--r--common/kv/interface.go3
-rw-r--r--plugins/boltdb/boltjobs/consumer.go2
-rw-r--r--plugins/boltdb/boltjobs/listener.go5
-rw-r--r--plugins/boltdb/boltkv/driver.go8
-rw-r--r--plugins/boltdb/doc/job_lifecycle.md1
-rw-r--r--plugins/boltdb/plugin.go24
-rw-r--r--plugins/ephemeral/plugin.go41
-rw-r--r--plugins/kv/plugin.go4
-rw-r--r--plugins/memcached/memcachedkv/config.go (renamed from plugins/memcached/config.go)2
-rw-r--r--plugins/memcached/memcachedkv/driver.go (renamed from plugins/memcached/driver.go)4
-rw-r--r--plugins/memcached/plugin.go3
-rw-r--r--plugins/memory/memoryjobs/consumer.go (renamed from plugins/ephemeral/consumer.go)2
-rw-r--r--plugins/memory/memoryjobs/item.go (renamed from plugins/ephemeral/item.go)2
-rw-r--r--plugins/memory/memorykv/config.go (renamed from plugins/memory/config.go)2
-rw-r--r--plugins/memory/memorykv/kv.go (renamed from plugins/memory/kv.go)66
-rw-r--r--plugins/memory/memorypubsub/pubsub.go (renamed from plugins/memory/pubsub.go)2
-rw-r--r--plugins/memory/plugin.go50
-rw-r--r--plugins/redis/clients.go84
-rw-r--r--plugins/redis/interface.go12
-rw-r--r--plugins/redis/kv/kv.go2
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml6
-rw-r--r--tests/plugins/jobs/jobs_general_test.go10
-rw-r--r--tests/plugins/jobs/jobs_memory_test.go (renamed from tests/plugins/jobs/jobs_ephemeral_test.go)90
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-declare.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml)0
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-init.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml)4
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml)0
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml)6
-rw-r--r--tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml3
-rw-r--r--tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml3
-rw-r--r--tests/plugins/kv/configs/.rr-kv-init.yaml6
-rw-r--r--tests/plugins/redis/plugin1.go45
-rw-r--r--tests/plugins/redis/redis_plugin_test.go120
-rw-r--r--tests/plugins/rpc/configs/.rr-rpc-disabled.yaml7
-rw-r--r--tests/plugins/rpc/configs/.rr.yaml8
35 files changed, 185 insertions, 456 deletions
diff --git a/Makefile b/Makefile
index 389c9014..1de45451 100755
--- a/Makefile
+++ b/Makefile
@@ -35,7 +35,6 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/headers.txt -covermode=atomic ./tests/plugins/headers
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/logger.txt -covermode=atomic ./tests/plugins/logger
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/metrics.txt -covermode=atomic ./tests/plugins/metrics
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/redis.txt -covermode=atomic ./tests/plugins/redis
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/resetter.txt -covermode=atomic ./tests/plugins/resetter
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./tests/plugins/rpc
cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt
@@ -50,10 +49,15 @@ test: ## Run application tests
go test -v -race -tags=debug ./pkg/worker_watcher
go test -v -race -tags=debug ./pkg/bst
go test -v -race -tags=debug ./pkg/priority_queue
- go test -v -race -tags=debug ./plugins/jobs/job
go test -v -race -tags=debug ./plugins/jobs/pipeline
go test -v -race -tags=debug ./plugins/http/config
go test -v -race -tags=debug ./plugins/server
+ go test -v -race -tags=debug ./plugins/jobs/job
+ go test -v -race -tags=debug ./tests/plugins/jobs
+ go test -v -race -tags=debug ./tests/plugins/kv
+ go test -v -race -tags=debug ./tests/plugins/broadcast
+ go test -v -race -tags=debug ./tests/plugins/websockets
+ go test -v -race -tags=debug ./plugins/websockets
go test -v -race -tags=debug ./tests/plugins/http
go test -v -race -tags=debug ./tests/plugins/informer
go test -v -race -tags=debug ./tests/plugins/reload
@@ -65,12 +69,6 @@ test: ## Run application tests
go test -v -race -tags=debug ./tests/plugins/headers
go test -v -race -tags=debug ./tests/plugins/logger
go test -v -race -tags=debug ./tests/plugins/metrics
- go test -v -race -tags=debug ./tests/plugins/redis
go test -v -race -tags=debug ./tests/plugins/resetter
go test -v -race -tags=debug ./tests/plugins/rpc
- go test -v -race -tags=debug ./tests/plugins/kv
- go test -v -race -tags=debug ./tests/plugins/broadcast
- go test -v -race -tags=debug ./tests/plugins/websockets
- go test -v -race -tags=debug ./plugins/websockets
- go test -v -race -tags=debug ./tests/plugins/jobs
docker-compose -f tests/env/docker-compose.yaml down
diff --git a/common/kv/interface.go b/common/kv/interface.go
index 5736a6a7..bc6a07b2 100644
--- a/common/kv/interface.go
+++ b/common/kv/interface.go
@@ -30,6 +30,9 @@ type Storage interface {
// Delete one or multiple keys.
Delete(keys ...string) error
+
+ // Stop the storage driver
+ Stop()
}
// Constructor provides storage based on the config
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index ed0eda61..46d596fa 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return &consumer{
file: pipeline.String(file, rrDB),
priority: pipeline.Int(priority, 10),
- prefetch: pipeline.Int(prefetch, 100),
+ prefetch: pipeline.Int(prefetch, 1000),
permissions: conf.Permissions,
bPool: sync.Pool{New: func() interface{} {
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 7c161555..081d3f57 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -3,6 +3,7 @@ package boltjobs
import (
"bytes"
"encoding/gob"
+ "sync/atomic"
"time"
"github.com/spiral/roadrunner/v2/utils"
@@ -18,6 +19,10 @@ func (c *consumer) listener() {
c.log.Info("boltdb listener stopped")
return
case <-tt.C:
+ if atomic.LoadUint64(c.active) > uint64(c.prefetch) {
+ time.Sleep(time.Second)
+ continue
+ }
tx, err := c.db.Begin(true)
if err != nil {
c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go
index ba1450cd..656d572e 100644
--- a/plugins/boltdb/boltkv/driver.go
+++ b/plugins/boltdb/boltkv/driver.go
@@ -38,7 +38,7 @@ type Driver struct {
stop chan struct{}
}
-func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
+func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) {
const op = errors.Op("new_boltdb_driver")
if !cfgPlugin.Has(RootPluginName) {
@@ -47,7 +47,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
d := &Driver{
log: log,
- stop: stop,
+ stop: make(chan struct{}),
}
err := cfgPlugin.UnmarshalKey(key, &d.cfg)
@@ -411,6 +411,10 @@ func (d *Driver) Clear() error {
return nil
}
+func (d *Driver) Stop() {
+ d.stop <- struct{}{}
+}
+
// ========================= PRIVATE =================================
func (d *Driver) startGCLoop() { //nolint:gocognit
diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md
index 317aec90..1424e586 100644
--- a/plugins/boltdb/doc/job_lifecycle.md
+++ b/plugins/boltdb/doc/job_lifecycle.md
@@ -7,4 +7,3 @@ There are several boltdb buckets:
get into the `InQueueBucket` waiting to acknowledgement.
3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration.
-``
diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go
index 683b26f1..ad98cf3c 100644
--- a/plugins/boltdb/plugin.go
+++ b/plugins/boltdb/plugin.go
@@ -19,19 +19,14 @@ const (
// Plugin BoltDB K/V storage.
type Plugin struct {
- cfgPlugin config.Configurer
+ cfg config.Configurer
// logger
log logger.Logger
- // stop is used to stop keys GC and close boltdb connection
- stop chan struct{}
-
- drivers uint
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.stop = make(chan struct{})
p.log = log
- p.cfgPlugin = cfg
+ p.cfg = cfg
return nil
}
@@ -41,12 +36,6 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
- if p.drivers > 0 {
- for i := uint(0); i < p.drivers; i++ {
- // send close signal to every driver
- p.stop <- struct{}{}
- }
- }
return nil
}
@@ -60,23 +49,20 @@ func (p *Plugin) Available() {}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
- st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop)
+ st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg)
if err != nil {
return nil, errors.E(op, err)
}
- // save driver number to release resources after Stop
- p.drivers++
-
return st, nil
}
// JOBS bbolt implementation
func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
- return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue)
+ return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue)
}
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
- return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue)
+ return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue)
}
diff --git a/plugins/ephemeral/plugin.go b/plugins/ephemeral/plugin.go
deleted file mode 100644
index 28495abb..00000000
--- a/plugins/ephemeral/plugin.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "ephemeral"
-)
-
-type Plugin struct {
- log logger.Logger
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.log = log
- p.cfg = cfg
- return nil
-}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func (p *Plugin) Available() {}
-
-// JobsConstruct creates new ephemeral consumer from the configuration
-func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewJobBroker(configKey, p.log, p.cfg, e, pq)
-}
-
-// FromPipeline creates new ephemeral consumer from the provided pipeline
-func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipeline, p.log, e, pq)
-}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index c6ca96c3..a1144b85 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -104,6 +104,10 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
+ // stop all attached storages
+ for k := range p.storages {
+ p.storages[k].Stop()
+ }
return nil
}
diff --git a/plugins/memcached/config.go b/plugins/memcached/memcachedkv/config.go
index 6d413790..569e2573 100644
--- a/plugins/memcached/config.go
+++ b/plugins/memcached/memcachedkv/config.go
@@ -1,4 +1,4 @@
-package memcached
+package memcachedkv
type Config struct {
// Addr is url for memcached, 11211 port is used by default
diff --git a/plugins/memcached/driver.go b/plugins/memcached/memcachedkv/driver.go
index e24747fe..6d5e1802 100644
--- a/plugins/memcached/driver.go
+++ b/plugins/memcached/memcachedkv/driver.go
@@ -1,4 +1,4 @@
-package memcached
+package memcachedkv
import (
"strings"
@@ -246,3 +246,5 @@ func (d *Driver) Clear() error {
return nil
}
+
+func (d *Driver) Stop() {}
diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go
index 59a2b7cb..47bca0e2 100644
--- a/plugins/memcached/plugin.go
+++ b/plugins/memcached/plugin.go
@@ -5,6 +5,7 @@ import (
"github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memcached/memcachedkv"
)
const (
@@ -39,7 +40,7 @@ func (s *Plugin) Available() {}
func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
- st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
+ st, err := memcachedkv.NewMemcachedDriver(s.log, key, s.cfgPlugin)
if err != nil {
return nil, errors.E(op, err)
}
diff --git a/plugins/ephemeral/consumer.go b/plugins/memory/memoryjobs/consumer.go
index 8870bb0f..94abaadb 100644
--- a/plugins/ephemeral/consumer.go
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -1,4 +1,4 @@
-package ephemeral
+package memoryjobs
import (
"context"
diff --git a/plugins/ephemeral/item.go b/plugins/memory/memoryjobs/item.go
index 3298424d..8224c26b 100644
--- a/plugins/ephemeral/item.go
+++ b/plugins/memory/memoryjobs/item.go
@@ -1,4 +1,4 @@
-package ephemeral
+package memoryjobs
import (
"context"
diff --git a/plugins/memory/config.go b/plugins/memory/memorykv/config.go
index e51d09c5..a8a8993f 100644
--- a/plugins/memory/config.go
+++ b/plugins/memory/memorykv/config.go
@@ -1,4 +1,4 @@
-package memory
+package memorykv
// Config is default config for the in-memory driver
type Config struct {
diff --git a/plugins/memory/kv.go b/plugins/memory/memorykv/kv.go
index 68ea7266..9b3e176c 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/memorykv/kv.go
@@ -1,4 +1,4 @@
-package memory
+package memorykv
import (
"strings"
@@ -20,11 +20,11 @@ type Driver struct {
cfg *Config
}
-func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
+func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) {
const op = errors.Op("new_in_memory_driver")
d := &Driver{
- stop: stop,
+ stop: make(chan struct{}),
log: log,
}
@@ -40,7 +40,7 @@ func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configure
return d, nil
}
-func (s *Driver) Has(keys ...string) (map[string]bool, error) {
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("in_memory_plugin_has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -52,7 +52,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) {
return nil, errors.E(op, errors.EmptyKey)
}
- if _, ok := s.heap.Load(keys[i]); ok {
+ if _, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = true
}
}
@@ -60,7 +60,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) {
return m, nil
}
-func (s *Driver) Get(key string) ([]byte, error) {
+func (d *Driver) Get(key string) ([]byte, error) {
const op = errors.Op("in_memory_plugin_get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -68,7 +68,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
return nil, errors.E(op, errors.EmptyKey)
}
- if data, exist := s.heap.Load(key); exist {
+ if data, exist := d.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
return data.(*kvv1.Item).Value, nil
@@ -76,7 +76,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
return nil, nil
}
-func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("in_memory_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -93,7 +93,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
m := make(map[string][]byte, len(keys))
for i := range keys {
- if value, ok := s.heap.Load(keys[i]); ok {
+ if value, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = value.(*kvv1.Item).Value
}
}
@@ -101,7 +101,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
return m, nil
}
-func (s *Driver) Set(items ...*kvv1.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -120,14 +120,14 @@ func (s *Driver) Set(items ...*kvv1.Item) error {
}
}
- s.heap.Store(items[i].Key, items[i])
+ d.heap.Store(items[i].Key, items[i])
}
return nil
}
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...*kvv1.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
if items[i] == nil {
@@ -138,7 +138,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
}
// if key exist, overwrite it value
- if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok {
+ if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok {
// check that time is correct
_, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
@@ -148,7 +148,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, &kvv1.Item{
+ d.heap.Store(items[i].Key, &kvv1.Item{
Key: items[i].Key,
Value: tmp.Value,
Timeout: items[i].Timeout,
@@ -159,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
return nil
}
-func (s *Driver) TTL(keys ...string) (map[string]string, error) {
+func (d *Driver) TTL(keys ...string) (map[string]string, error) {
const op = errors.Op("in_memory_plugin_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -176,14 +176,14 @@ func (s *Driver) TTL(keys ...string) (map[string]string, error) {
m := make(map[string]string, len(keys))
for i := range keys {
- if item, ok := s.heap.Load(keys[i]); ok {
+ if item, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
}
-func (s *Driver) Delete(keys ...string) error {
+func (d *Driver) Delete(keys ...string) error {
const op = errors.Op("in_memory_plugin_delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -198,34 +198,38 @@ func (s *Driver) Delete(keys ...string) error {
}
for i := range keys {
- s.heap.Delete(keys[i])
+ d.heap.Delete(keys[i])
}
return nil
}
-func (s *Driver) Clear() error {
- s.clearMu.Lock()
- s.heap = sync.Map{}
- s.clearMu.Unlock()
+func (d *Driver) Clear() error {
+ d.clearMu.Lock()
+ d.heap = sync.Map{}
+ d.clearMu.Unlock()
return nil
}
+func (d *Driver) Stop() {
+ d.stop <- struct{}{}
+}
+
// ================================== PRIVATE ======================================
-func (s *Driver) gc() {
- ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second)
+func (d *Driver) gc() {
+ ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second)
+ defer ticker.Stop()
for {
select {
- case <-s.stop:
- ticker.Stop()
+ case <-d.stop:
return
case now := <-ticker.C:
// mutes needed to clear the map
- s.clearMu.RLock()
+ d.clearMu.RLock()
// check every second
- s.heap.Range(func(key, value interface{}) bool {
+ d.heap.Range(func(key, value interface{}) bool {
v := value.(*kvv1.Item)
if v.Timeout == "" {
return true
@@ -237,13 +241,13 @@ func (s *Driver) gc() {
}
if now.After(t) {
- s.log.Debug("key deleted", "key", key)
- s.heap.Delete(key)
+ d.log.Debug("key deleted", "key", key)
+ d.heap.Delete(key)
}
return true
})
- s.clearMu.RUnlock()
+ d.clearMu.RUnlock()
}
}
}
diff --git a/plugins/memory/pubsub.go b/plugins/memory/memorypubsub/pubsub.go
index fd30eb54..75122571 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/memorypubsub/pubsub.go
@@ -1,4 +1,4 @@
-package memory
+package memorypubsub
import (
"context"
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 7d418a70..515e469a 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,27 +2,29 @@ package memory
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/common/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memorykv"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub"
)
const PluginName string = "memory"
type Plugin struct {
- // heap is user map for the key-value pairs
- stop chan struct{}
-
- log logger.Logger
- cfgPlugin config.Configurer
- drivers uint
+ log logger.Logger
+ cfg config.Configurer
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
- p.cfgPlugin = cfg
- p.stop = make(chan struct{}, 1)
+ p.cfg = cfg
return nil
}
@@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
- if p.drivers > 0 {
- for i := uint(0); i < p.drivers; i++ {
- // send close signal to every driver
- p.stop <- struct{}{}
- }
- }
return nil
}
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
+
+// Drivers implementation
+
func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
- return NewPubSubDriver(p.log, key)
+ return memorypubsub.NewPubSubDriver(p.log, key)
}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
- st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop)
+ st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg)
if err != nil {
return nil, errors.E(op, err)
}
-
- // save driver number to release resources after Stop
- p.drivers++
-
return st, nil
}
-func (p *Plugin) Name() string {
- return PluginName
+// JobsConstruct creates new ephemeral consumer from the configuration
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq)
+}
+
+// FromPipeline creates new ephemeral consumer from the provided pipeline
+func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return memoryjobs.FromPipeline(pipeline, p.log, e, pq)
}
diff --git a/plugins/redis/clients.go b/plugins/redis/clients.go
deleted file mode 100644
index d0a184d2..00000000
--- a/plugins/redis/clients.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package redis
-
-import (
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
-)
-
-// RedisClient return a client based on the provided section key
-// key sample: kv.some-section.redis
-// kv.redis
-// redis (root)
-func (p *Plugin) RedisClient(key string) (redis.UniversalClient, error) {
- const op = errors.Op("redis_get_client")
-
- if !p.cfgPlugin.Has(key) {
- return nil, errors.E(op, errors.Errorf("no such section: %s", key))
- }
-
- cfg := &Config{}
-
- err := p.cfgPlugin.UnmarshalKey(key, cfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- cfg.InitDefaults()
-
- uc := redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: cfg.Addrs,
- DB: cfg.DB,
- Username: cfg.Username,
- Password: cfg.Password,
- SentinelPassword: cfg.SentinelPassword,
- MaxRetries: cfg.MaxRetries,
- MinRetryBackoff: cfg.MaxRetryBackoff,
- MaxRetryBackoff: cfg.MaxRetryBackoff,
- DialTimeout: cfg.DialTimeout,
- ReadTimeout: cfg.ReadTimeout,
- WriteTimeout: cfg.WriteTimeout,
- PoolSize: cfg.PoolSize,
- MinIdleConns: cfg.MinIdleConns,
- MaxConnAge: cfg.MaxConnAge,
- PoolTimeout: cfg.PoolTimeout,
- IdleTimeout: cfg.IdleTimeout,
- IdleCheckFrequency: cfg.IdleCheckFreq,
- ReadOnly: cfg.ReadOnly,
- RouteByLatency: cfg.RouteByLatency,
- RouteRandomly: cfg.RouteRandomly,
- MasterName: cfg.MasterName,
- })
-
- return uc, nil
-}
-
-func (p *Plugin) DefaultClient() redis.UniversalClient {
- cfg := &Config{}
- cfg.InitDefaults()
-
- uc := redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: cfg.Addrs,
- DB: cfg.DB,
- Username: cfg.Username,
- Password: cfg.Password,
- SentinelPassword: cfg.SentinelPassword,
- MaxRetries: cfg.MaxRetries,
- MinRetryBackoff: cfg.MaxRetryBackoff,
- MaxRetryBackoff: cfg.MaxRetryBackoff,
- DialTimeout: cfg.DialTimeout,
- ReadTimeout: cfg.ReadTimeout,
- WriteTimeout: cfg.WriteTimeout,
- PoolSize: cfg.PoolSize,
- MinIdleConns: cfg.MinIdleConns,
- MaxConnAge: cfg.MaxConnAge,
- PoolTimeout: cfg.PoolTimeout,
- IdleTimeout: cfg.IdleTimeout,
- IdleCheckFrequency: cfg.IdleCheckFreq,
- ReadOnly: cfg.ReadOnly,
- RouteByLatency: cfg.RouteByLatency,
- RouteRandomly: cfg.RouteRandomly,
- MasterName: cfg.MasterName,
- })
-
- return uc
-}
diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go
deleted file mode 100644
index 189b0002..00000000
--- a/plugins/redis/interface.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package redis
-
-import "github.com/go-redis/redis/v8"
-
-// Redis in the redis KV plugin interface
-type Redis interface {
- // RedisClient provides universal redis client
- RedisClient(key string) (redis.UniversalClient, error)
-
- // DefaultClient provide default redis client based on redis defaults
- DefaultClient() redis.UniversalClient
-}
diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go
index b41cb86c..3d062fbb 100644
--- a/plugins/redis/kv/kv.go
+++ b/plugins/redis/kv/kv.go
@@ -248,3 +248,5 @@ func (d *Driver) Clear() error {
return nil
}
+
+func (d *Driver) Stop() {}
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index bf9f60cc..9813344e 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -45,17 +45,17 @@ jobs:
# list of broker pipelines associated with endpoints
pipelines:
test-local:
- driver: ephemeral
+ driver: memory
priority: 10
prefetch: 10000
test-local-2:
- driver: ephemeral
+ driver: memory
priority: 1
prefetch: 10000
test-local-3:
- driver: ephemeral
+ driver: memory
priority: 2
prefetch: 10000
diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go
index 951d6227..5c521c2b 100644
--- a/tests/plugins/jobs/jobs_general_test.go
+++ b/tests/plugins/jobs/jobs_general_test.go
@@ -14,9 +14,9 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/roadrunner/v2/plugins/amqp"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/metrics"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
@@ -68,7 +68,7 @@ func TestJobsInit(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
&amqp.Plugin{},
)
assert.NoError(t, err)
@@ -154,7 +154,7 @@ func TestJOBSMetrics(t *testing.T) {
&server.Plugin{},
&jobs.Plugin{},
&metrics.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
mockLogger,
)
assert.NoError(t, err)
@@ -204,8 +204,8 @@ func TestJOBSMetrics(t *testing.T) {
time.Sleep(time.Second * 2)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ t.Run("DeclareEphemeralPipeline", declareMemoryPipe)
+ t.Run("ConsumeEphemeralPipeline", consumeMemoryPipe)
t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5))
diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_memory_test.go
index 2890aa9d..20cbfb3f 100644
--- a/tests/plugins/jobs/jobs_ephemeral_test.go
+++ b/tests/plugins/jobs/jobs_memory_test.go
@@ -15,9 +15,9 @@ import (
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -26,12 +26,12 @@ import (
"github.com/stretchr/testify/assert"
)
-func TestEphemeralInit(t *testing.T) {
+func TestMemoryInit(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-init.yaml",
+ Path: "memory/.rr-memory-init.yaml",
Prefix: "rr",
}
@@ -58,7 +58,7 @@ func TestEphemeralInit(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -112,12 +112,12 @@ func TestEphemeralInit(t *testing.T) {
wg.Wait()
}
-func TestEphemeralDeclare(t *testing.T) {
+func TestMemoryDeclare(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-declare.yaml",
+ Path: "memory/.rr-memory-declare.yaml",
Prefix: "rr",
}
@@ -135,7 +135,7 @@ func TestEphemeralDeclare(t *testing.T) {
mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
err = cont.RegisterAll(
@@ -146,7 +146,7 @@ func TestEphemeralDeclare(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -197,25 +197,25 @@ func TestEphemeralDeclare(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("DeclarePipeline", declareMemoryPipe)
+ t.Run("ConsumePipeline", consumeMemoryPipe)
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
- t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ t.Run("PausePipeline", pausePipelines("test-3"))
time.Sleep(time.Second)
- t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
time.Sleep(time.Second * 5)
stopCh <- struct{}{}
wg.Wait()
}
-func TestEphemeralPauseResume(t *testing.T) {
+func TestMemoryPauseResume(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-pause-resume.yaml",
+ Path: "memory/.rr-memory-pause-resume.yaml",
Prefix: "rr",
}
@@ -231,7 +231,7 @@ func TestEphemeralPauseResume(t *testing.T) {
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
@@ -249,7 +249,7 @@ func TestEphemeralPauseResume(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -301,10 +301,10 @@ func TestEphemeralPauseResume(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("ephemeralResume", resumePipes("test-local"))
- t.Run("ephemeralPause", pausePipelines("test-local"))
+ t.Run("Resume", resumePipes("test-local"))
+ t.Run("Pause", pausePipelines("test-local"))
t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local"))
- t.Run("ephemeralResume", resumePipes("test-local"))
+ t.Run("Resume", resumePipes("test-local"))
t.Run("pushToEnabledPipe", pushToPipe("test-local"))
time.Sleep(time.Second * 1)
@@ -313,12 +313,12 @@ func TestEphemeralPauseResume(t *testing.T) {
wg.Wait()
}
-func TestEphemeralJobsError(t *testing.T) {
+func TestMemoryJobsError(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-jobs-err.yaml",
+ Path: "memory/.rr-memory-jobs-err.yaml",
Prefix: "rr",
}
@@ -336,7 +336,7 @@ func TestEphemeralJobsError(t *testing.T) {
mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -348,7 +348,7 @@ func TestEphemeralJobsError(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -399,25 +399,25 @@ func TestEphemeralJobsError(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", resumePipes("test-3"))
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("DeclarePipeline", declareMemoryPipe)
+ t.Run("ConsumePipeline", resumePipes("test-3"))
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second * 25)
- t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ t.Run("PausePipeline", pausePipelines("test-3"))
time.Sleep(time.Second)
- t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
time.Sleep(time.Second * 5)
stopCh <- struct{}{}
wg.Wait()
}
-func TestEphemeralStats(t *testing.T) {
+func TestMemoryStats(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-declare.yaml",
+ Path: "memory/.rr-memory-declare.yaml",
Prefix: "rr",
}
@@ -435,7 +435,7 @@ func TestEphemeralStats(t *testing.T) {
mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
err = cont.RegisterAll(
@@ -446,7 +446,7 @@ func TestEphemeralStats(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -497,22 +497,22 @@ func TestEphemeralStats(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("DeclarePipeline", declareMemoryPipe)
+ t.Run("ConsumePipeline", consumeMemoryPipe)
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
- t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ t.Run("PausePipeline", pausePipelines("test-3"))
time.Sleep(time.Second)
- t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5))
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("PushPipeline", pushToPipeDelayed("test-3", 5))
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
out := &jobState.State{}
t.Run("Stats", stats(out))
assert.Equal(t, out.Pipeline, "test-3")
- assert.Equal(t, out.Driver, "ephemeral")
+ assert.Equal(t, out.Driver, "memory")
assert.Equal(t, out.Queue, "test-3")
assert.Equal(t, out.Active, int64(1))
@@ -520,14 +520,14 @@ func TestEphemeralStats(t *testing.T) {
assert.Equal(t, out.Reserved, int64(0))
time.Sleep(time.Second)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ t.Run("ConsumePipeline", consumeMemoryPipe)
time.Sleep(time.Second * 7)
out = &jobState.State{}
t.Run("Stats", stats(out))
assert.Equal(t, out.Pipeline, "test-3")
- assert.Equal(t, out.Driver, "ephemeral")
+ assert.Equal(t, out.Driver, "memory")
assert.Equal(t, out.Queue, "test-3")
assert.Equal(t, out.Active, int64(0))
@@ -541,13 +541,13 @@ func TestEphemeralStats(t *testing.T) {
wg.Wait()
}
-func declareEphemeralPipe(t *testing.T) {
+func declareMemoryPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
- "driver": "ephemeral",
+ "driver": "memory",
"name": "test-3",
"prefetch": "10000",
}}
@@ -557,7 +557,7 @@ func declareEphemeralPipe(t *testing.T) {
assert.NoError(t, err)
}
-func consumeEphemeralPipe(t *testing.T) {
+func consumeMemoryPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml b/tests/plugins/jobs/memory/.rr-memory-declare.yaml
index 726c24ac..726c24ac 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-declare.yaml
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml b/tests/plugins/jobs/memory/.rr-memory-init.yaml
index 8914dfaa..9ee8afc2 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-init.yaml
@@ -22,12 +22,12 @@ jobs:
pipelines:
test-1:
- driver: ephemeral
+ driver: memory
priority: 10
prefetch: 10000
test-2:
- driver: ephemeral
+ driver: memory
priority: 10
prefetch: 10000
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml
index 05dc3ffa..05dc3ffa 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml
index e1b76263..1ad48237 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml
@@ -25,17 +25,17 @@ jobs:
# list of broker pipelines associated with endpoints
pipelines:
test-local:
- driver: ephemeral
+ driver: memory
priority: 10
pipeline_size: 10000
test-local-2:
- driver: ephemeral
+ driver: memory
priority: 1
pipeline_size: 10000
test-local-3:
- driver: ephemeral
+ driver: memory
priority: 2
pipeline_size: 10000
diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml
index f58de3e4..471e5c77 100644
--- a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml
+++ b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml
@@ -1,6 +1,9 @@
rpc:
listen: tcp://127.0.0.1:6001
+logs:
+ mode: development
+ level: error
kv:
boltdb-south:
diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml
index 08b3bfad..b46bcb1c 100644
--- a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml
+++ b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml
@@ -1,6 +1,9 @@
rpc:
listen: tcp://127.0.0.1:6001
+logs:
+ mode: development
+ level: error
kv:
boltdb-south:
diff --git a/tests/plugins/kv/configs/.rr-kv-init.yaml b/tests/plugins/kv/configs/.rr-kv-init.yaml
index a13b591c..6407c7ad 100644
--- a/tests/plugins/kv/configs/.rr-kv-init.yaml
+++ b/tests/plugins/kv/configs/.rr-kv-init.yaml
@@ -1,6 +1,9 @@
rpc:
listen: tcp://127.0.0.1:6001
+logs:
+ mode: development
+ level: error
kv:
default:
@@ -25,6 +28,3 @@ kv:
memcached:
driver: memcached
addr: [ "127.0.0.1:11211" ]
-
-# redis:
-# driver: redis
diff --git a/tests/plugins/redis/plugin1.go b/tests/plugins/redis/plugin1.go
deleted file mode 100644
index 68da1394..00000000
--- a/tests/plugins/redis/plugin1.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package redis
-
-import (
- "context"
- "time"
-
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
- redisPlugin "github.com/spiral/roadrunner/v2/plugins/redis"
-)
-
-type Plugin1 struct {
- redisClient redis.UniversalClient
-}
-
-func (p *Plugin1) Init(redis redisPlugin.Redis) error {
- var err error
- p.redisClient, err = redis.RedisClient("redis")
-
- return err
-}
-
-func (p *Plugin1) Serve() chan error {
- const op = errors.Op("plugin1 serve")
- errCh := make(chan error, 1)
- p.redisClient.Set(context.Background(), "foo", "bar", time.Minute)
-
- stringCmd := p.redisClient.Get(context.Background(), "foo")
- data, err := stringCmd.Result()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- if data != "bar" {
- errCh <- errors.E(op, errors.Str("no such key"))
- return errCh
- }
-
- return errCh
-}
-
-func (p *Plugin1) Stop() error {
- return nil
-}
diff --git a/tests/plugins/redis/redis_plugin_test.go b/tests/plugins/redis/redis_plugin_test.go
deleted file mode 100644
index 1b84e339..00000000
--- a/tests/plugins/redis/redis_plugin_test.go
+++ /dev/null
@@ -1,120 +0,0 @@
-package redis
-
-import (
- "fmt"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "testing"
-
- "github.com/alicebob/miniredis/v2"
- "github.com/golang/mock/gomock"
- endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/redis"
- "github.com/spiral/roadrunner/v2/tests/mocks"
- "github.com/stretchr/testify/assert"
-)
-
-func redisConfig(port string) string {
- cfg := `
-redis:
- addrs:
- - '127.0.0.1:%s'
- master_name: ''
- username: ''
- password: ''
- db: 0
- sentinel_password: ''
- route_by_latency: false
- route_randomly: false
- dial_timeout: 0
- max_retries: 1
- min_retry_backoff: 0
- max_retry_backoff: 0
- pool_size: 0
- min_idle_conns: 0
- max_conn_age: 0
- read_timeout: 0
- write_timeout: 0
- pool_timeout: 0
- idle_timeout: 0
- idle_check_freq: 0
- read_only: false
-`
- return fmt.Sprintf(cfg, port)
-}
-
-func TestRedisInit(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- if err != nil {
- t.Fatal(err)
- }
-
- s, err := miniredis.Run()
- assert.NoError(t, err)
-
- c := redisConfig(s.Port())
-
- cfg := &config.Viper{}
- cfg.Type = "yaml"
- cfg.ReadInCfg = []byte(c)
-
- controller := gomock.NewController(t)
- mockLogger := mocks.NewMockLogger(controller)
-
- err = cont.RegisterAll(
- cfg,
- mockLogger,
- &redis.Plugin{},
- &Plugin1{},
- )
- assert.NoError(t, err)
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- assert.NoError(t, err)
-
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- stopCh := make(chan struct{}, 1)
-
- go func() {
- defer wg.Done()
- for {
- select {
- case e := <-ch:
- assert.Fail(t, "error", e.Error.Error())
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-stopCh:
- // timeout
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- }
- }
- }()
-
- stopCh <- struct{}{}
- wg.Wait()
-}
diff --git a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml
index 5ab359d3..d256aad7 100644
--- a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml
+++ b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml
@@ -1,3 +1,8 @@
logs:
mode: development
- level: error \ No newline at end of file
+ level: panic
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: panic
diff --git a/tests/plugins/rpc/configs/.rr.yaml b/tests/plugins/rpc/configs/.rr.yaml
index 67d935e3..d6aaa7c6 100644
--- a/tests/plugins/rpc/configs/.rr.yaml
+++ b/tests/plugins/rpc/configs/.rr.yaml
@@ -1,5 +1,11 @@
rpc:
listen: tcp://127.0.0.1:6001
+
logs:
mode: development
- level: error \ No newline at end of file
+ level: panic
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: panic