diff options
author | Valery Piashchynski <[email protected]> | 2021-04-22 22:03:59 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-04-22 22:03:59 +0300 |
commit | 91c1fa2e2693cb662425c1ba7cca2325a8458995 (patch) | |
tree | d97791a6675678f607396af5de81143e764ca108 /plugins | |
parent | 1e62c2afa4fe8b5bae0c26e72ae61ef6b5f0f54d (diff) |
- Rework storage drivers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/kv/drivers/boltdb/driver.go | 420 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin.go | 432 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin_unit_test.go | 531 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/driver.go | 235 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin.go | 228 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin_unit_test.go | 432 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/driver.go | 229 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/plugin.go | 229 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/plugin_unit_test.go | 472 | ||||
-rw-r--r-- | plugins/kv/interface.go | 14 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 18 | ||||
-rw-r--r-- | plugins/kv/storage.go | 43 |
12 files changed, 949 insertions, 2334 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go new file mode 100644 index 00000000..8c3bb68f --- /dev/null +++ b/plugins/kv/drivers/boltdb/driver.go @@ -0,0 +1,420 @@ +package boltdb + +import ( + "bytes" + "encoding/gob" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" + bolt "go.etcd.io/bbolt" +) + +type Driver struct { + // db instance + DB *bolt.DB + // name should be UTF-8 + bucket []byte + log logger.Logger + cfg *Config + // gc contains key which are contain timeouts + gc sync.Map + // default timeout for cache cleanup is 1 minute + timeout time.Duration + + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} +} + +func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) { + const op = errors.Op("new_boltdb_driver") + + d := &Driver{ + log: log, + stop: stop, + } + + err := cfgPlugin.UnmarshalKey(key, &d.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + d.bucket = []byte(d.cfg.Bucket) + d.timeout = time.Duration(d.cfg.Interval) * time.Second + d.gc = sync.Map{} + + // add default values + d.cfg.InitDefaults() + + db, err := bolt.Open(path.Join(d.cfg.Dir, d.cfg.File), os.FileMode(d.cfg.Permissions), nil) + if err != nil { + return nil, errors.E(op, err) + } + + d.DB = db + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists([]byte(d.cfg.Bucket)) + if err != nil { + return errors.E(op, upOp) + } + return nil + }) + + if err != nil { + return nil, errors.E(op, err) + } + + go d.startGCLoop() + + return d, nil +} + +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("boltdb_plugin_has") + d.log.Debug("boltdb HAS method called", "args", keys) + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + m := make(map[string]bool, len(keys)) + + // this is readable transaction + err := d.DB.View(func(tx *bolt.Tx) error { + // Get retrieves the value for a key in the bucket. + // Returns a nil value if the key does not exist or if the key is a nested bucket. + // The returned value is only valid for the life of the transaction. + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + exist := b.Get([]byte(keys[i])) + if exist != nil { + m[keys[i]] = true + } + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + d.log.Debug("boltdb HAS method finished") + return m, nil +} + +// Get retrieves the value for a key in the bucket. +// Returns a nil value if the key does not exist or if the key is a nested bucket. +// The returned value is only valid for the life of the transaction. +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("boltdb_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + var val []byte + err := d.DB.View(func(tx *bolt.Tx) error { + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + val = b.Get([]byte(key)) + + // try to decode values + if val != nil { + buf := bytes.NewReader(val) + decoder := gob.NewDecoder(buf) + + var i string + err := decoder.Decode(&i) + if err != nil { + // unsafe (w/o runes) convert + return errors.E(op, err) + } + + // set the value + val = []byte(i) + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + return val, nil +} + +func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("boltdb_plugin_mget") + // defense + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + err := d.DB.View(func(tx *bolt.Tx) error { + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + + buf := new(bytes.Buffer) + var out string + buf.Grow(100) + for i := range keys { + value := b.Get([]byte(keys[i])) + buf.Write(value) + // allocate enough space + dec := gob.NewDecoder(buf) + if value != nil { + err := dec.Decode(&out) + if err != nil { + return errors.E(op, err) + } + m[keys[i]] = out + buf.Reset() + out = "" + } + } + + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + return m, nil +} + +// Set puts the K/V to the bolt +func (d *Driver) Set(items ...kv.Item) error { + const op = errors.Op("boltdb_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + // start writable transaction + tx, err := d.DB.Begin(true) + if err != nil { + return errors.E(op, err) + } + defer func() { + err = tx.Commit() + if err != nil { + errRb := tx.Rollback() + if errRb != nil { + d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) + } + } + }() + + b := tx.Bucket(d.bucket) + // use access by index to avoid copying + for i := range items { + // performance note: pass a prepared bytes slice with initial cap + // we can't move buf and gob out of loop, because we need to clear both from data + // but gob will contain (w/o re-init) the past data + buf := bytes.Buffer{} + encoder := gob.NewEncoder(&buf) + if errors.Is(errors.EmptyItem, err) { + return errors.E(op, errors.EmptyItem) + } + + // Encode value + err = encoder.Encode(&items[i].Value) + if err != nil { + return errors.E(op, err) + } + // buf.Bytes will copy the underlying slice. Take a look in case of performance problems + err = b.Put([]byte(items[i].Key), buf.Bytes()) + if err != nil { + return errors.E(op, err) + } + + // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check + // we do not need mutex here, since we use sync.Map + if items[i].TTL != "" { + // check correctness of provided TTL + _, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return errors.E(op, err) + } + // Store key TTL in the separate map + d.gc.Store(items[i].Key, items[i].TTL) + } + + buf.Reset() + } + + return nil +} + +// Delete all keys from DB +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("boltdb_plugin_delete") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + // start writable transaction + tx, err := d.DB.Begin(true) + if err != nil { + return errors.E(op, err) + } + + defer func() { + err = tx.Commit() + if err != nil { + errRb := tx.Rollback() + if errRb != nil { + d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) + } + } + }() + + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + + for _, key := range keys { + err = b.Delete([]byte(key)) + if err != nil { + return errors.E(op, err) + } + } + + return nil +} + +// MExpire sets the expiration time to the key +// If key already has the expiration time, it will be overwritten +func (d *Driver) MExpire(items ...kv.Item) error { + const op = errors.Op("boltdb_plugin_mexpire") + for i := range items { + if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // verify provided TTL + _, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return errors.E(op, err) + } + + d.gc.Store(items[i].Key, items[i].TTL) + } + return nil +} + +func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("boltdb_plugin_ttl") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + for i := range keys { + if item, ok := d.gc.Load(keys[i]); ok { + // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64 + m[keys[i]] = item.(string) + } + } + return m, nil +} + +// ========================= PRIVATE ================================= + +func (d *Driver) startGCLoop() { //nolint:gocognit + go func() { + t := time.NewTicker(d.timeout) + defer t.Stop() + for { + select { + case <-t.C: + // calculate current time before loop started to be fair + now := time.Now() + d.gc.Range(func(key, value interface{}) bool { + const op = errors.Op("boltdb_plugin_gc") + k := key.(string) + v, err := time.Parse(time.RFC3339, value.(string)) + if err != nil { + return false + } + + if now.After(v) { + // time expired + d.gc.Delete(k) + d.log.Debug("key deleted", "key", k) + err := d.DB.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + err := b.Delete([]byte(k)) + if err != nil { + return errors.E(op, err) + } + return nil + }) + if err != nil { + d.log.Error("error during the gc phase of update", "error", err) + // todo this error is ignored, it means, that timer still be active + // to prevent this, we need to invoke t.Stop() + return false + } + } + return true + }) + case <-d.stop: + err := d.DB.Close() + if err != nil { + d.log.Error("error") + } + return + } + } + }() +} diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 98e2bf22..9d1e0dba 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -1,45 +1,23 @@ package boltdb import ( - "bytes" - "encoding/gob" - "os" - "path" - "strings" - "sync" - "time" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" - bolt "go.etcd.io/bbolt" ) const PluginName = "boltdb" // Plugin BoltDB K/V storage. type Plugin struct { - sync.Mutex - // db instance - DB *bolt.DB - // name should be UTF-8 - bucket []byte - - // boltdb configuration - cfg *Config cfgPlugin config.Configurer - // logger log logger.Logger - - // gc contains key which are contain timeouts - gc *sync.Map - // default timeout for cache cleanup is 1 minute - timeout time.Duration - // stop is used to stop keys GC and close boltdb connection stop chan struct{} + + drivers uint } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { @@ -47,6 +25,7 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { return errors.E(errors.Disabled) } + s.stop = make(chan struct{}) s.log = log s.cfgPlugin = cfg return nil @@ -54,418 +33,33 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { // Serve is noop here func (s *Plugin) Serve() chan error { - s.Lock() - defer s.Unlock() return make(chan error, 1) } func (s *Plugin) Stop() error { - s.Lock() - defer s.Unlock() - const op = errors.Op("boltdb_plugin_stop") - if s.DB != nil { - err := s.Close() - if err != nil { - return errors.E(op, err) + if s.drivers > 0 { + for i := uint(0); i < s.drivers; i++ { + // send close signal to every driver + s.stop <- struct{}{} } } return nil } -func (s *Plugin) Configure(key string) (kv.Storage, error) { - const op = errors.Op("boltdb_plugin_configure") - s.Lock() - defer s.Unlock() - - err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - s.bucket = []byte(s.cfg.Bucket) - s.stop = make(chan struct{}, 1) - s.timeout = time.Duration(s.cfg.Interval) * time.Second - s.gc = &sync.Map{} - - // add default values - s.cfg.InitDefaults() - - db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil) - if err != nil { - return nil, errors.E(op, err) - } - - s.DB = db - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists([]byte(s.cfg.Bucket)) - if err != nil { - return errors.E(op, upOp) - } - return nil - }) - - if err != nil { - return nil, errors.E(op, err) - } - - // start the GC phase - go s.startGCLoop() - - return s, nil -} - -func (s *Plugin) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("boltdb_plugin_has") - s.log.Debug("boltdb HAS method called", "args", keys) - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - m := make(map[string]bool, len(keys)) - - // this is readable transaction - err := s.DB.View(func(tx *bolt.Tx) error { - // Get retrieves the value for a key in the bucket. - // Returns a nil value if the key does not exist or if the key is a nested bucket. - // The returned value is only valid for the life of the transaction. - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - b := tx.Bucket(s.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - exist := b.Get([]byte(keys[i])) - if exist != nil { - m[keys[i]] = true - } - } - return nil - }) +func (s *Plugin) Provide(key string) (kv.Storage, error) { + const op = errors.Op("boltdb_plugin_provide") + st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop) if err != nil { return nil, errors.E(op, err) } - s.log.Debug("boltdb HAS method finished") - return m, nil -} - -// Get retrieves the value for a key in the bucket. -// Returns a nil value if the key does not exist or if the key is a nested bucket. -// The returned value is only valid for the life of the transaction. -func (s *Plugin) Get(key string) ([]byte, error) { - const op = errors.Op("boltdb_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - var val []byte - err := s.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - val = b.Get([]byte(key)) - - // try to decode values - if val != nil { - buf := bytes.NewReader(val) - decoder := gob.NewDecoder(buf) - - var i string - err := decoder.Decode(&i) - if err != nil { - // unsafe (w/o runes) convert - return errors.E(op, err) - } - - // set the value - val = []byte(i) - } - return nil - }) - if err != nil { - return nil, errors.E(op, err) - } - - return val, nil -} - -func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("boltdb_plugin_mget") - // defense - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]interface{}, len(keys)) - - err := s.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - - buf := new(bytes.Buffer) - var out string - buf.Grow(100) - for i := range keys { - value := b.Get([]byte(keys[i])) - buf.Write(value) - // allocate enough space - dec := gob.NewDecoder(buf) - if value != nil { - err := dec.Decode(&out) - if err != nil { - return errors.E(op, err) - } - m[keys[i]] = out - buf.Reset() - out = "" - } - } - - return nil - }) - if err != nil { - return nil, errors.E(op, err) - } - - return m, nil -} - -// Set puts the K/V to the bolt -func (s *Plugin) Set(items ...kv.Item) error { - const op = errors.Op("boltdb_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - // start writable transaction - tx, err := s.DB.Begin(true) - if err != nil { - return errors.E(op, err) - } - defer func() { - err = tx.Commit() - if err != nil { - errRb := tx.Rollback() - if errRb != nil { - s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) - } - } - }() - - b := tx.Bucket(s.bucket) - // use access by index to avoid copying - for i := range items { - // performance note: pass a prepared bytes slice with initial cap - // we can't move buf and gob out of loop, because we need to clear both from data - // but gob will contain (w/o re-init) the past data - buf := bytes.Buffer{} - encoder := gob.NewEncoder(&buf) - if errors.Is(errors.EmptyItem, err) { - return errors.E(op, errors.EmptyItem) - } - - // Encode value - err = encoder.Encode(&items[i].Value) - if err != nil { - return errors.E(op, err) - } - // buf.Bytes will copy the underlying slice. Take a look in case of performance problems - err = b.Put([]byte(items[i].Key), buf.Bytes()) - if err != nil { - return errors.E(op, err) - } - - // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check - // we do not need mutex here, since we use sync.Map - if items[i].TTL != "" { - // check correctness of provided TTL - _, err := time.Parse(time.RFC3339, items[i].TTL) - if err != nil { - return errors.E(op, err) - } - // Store key TTL in the separate map - s.gc.Store(items[i].Key, items[i].TTL) - } - - buf.Reset() - } + // save driver number to release resources after Stop + s.drivers++ - return nil -} - -// Delete all keys from DB -func (s *Plugin) Delete(keys ...string) error { - const op = errors.Op("boltdb_plugin_delete") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - // start writable transaction - tx, err := s.DB.Begin(true) - if err != nil { - return errors.E(op, err) - } - - defer func() { - err = tx.Commit() - if err != nil { - errRb := tx.Rollback() - if errRb != nil { - s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) - } - } - }() - - b := tx.Bucket(s.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - - for _, key := range keys { - err = b.Delete([]byte(key)) - if err != nil { - return errors.E(op, err) - } - } - - return nil -} - -// MExpire sets the expiration time to the key -// If key already has the expiration time, it will be overwritten -func (s *Plugin) MExpire(items ...kv.Item) error { - const op = errors.Op("boltdb_plugin_mexpire") - for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // verify provided TTL - _, err := time.Parse(time.RFC3339, items[i].TTL) - if err != nil { - return errors.E(op, err) - } - - s.gc.Store(items[i].Key, items[i].TTL) - } - return nil -} - -func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("boltdb_plugin_ttl") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]interface{}, len(keys)) - - for i := range keys { - if item, ok := s.gc.Load(keys[i]); ok { - // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64 - m[keys[i]] = item.(string) - } - } - return m, nil -} - -// Close the DB connection -func (s *Plugin) Close() error { - // stop the keys GC - s.stop <- struct{}{} - - return s.DB.Close() + return st, nil } // Name returns plugin name func (s *Plugin) Name() string { return PluginName } - -// ========================= PRIVATE ================================= - -func (s *Plugin) startGCLoop() { //nolint:gocognit - s.Lock() - defer s.Unlock() - - go func() { - t := time.NewTicker(s.timeout) - defer t.Stop() - for { - select { - case <-t.C: - // calculate current time before loop started to be fair - now := time.Now() - s.gc.Range(func(key, value interface{}) bool { - const op = errors.Op("boltdb_plugin_gc") - k := key.(string) - v, err := time.Parse(time.RFC3339, value.(string)) - if err != nil { - return false - } - - if now.After(v) { - // time expired - s.gc.Delete(k) - s.log.Debug("key deleted", "key", k) - err := s.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - err := b.Delete([]byte(k)) - if err != nil { - return errors.E(op, err) - } - return nil - }) - if err != nil { - s.log.Error("error during the gc phase of update", "error", err) - // todo this error is ignored, it means, that timer still be active - // to prevent this, we need to invoke t.Stop() - return false - } - } - return true - }) - case <-s.stop: - return - } - } - }() -} diff --git a/plugins/kv/drivers/boltdb/plugin_unit_test.go b/plugins/kv/drivers/boltdb/plugin_unit_test.go deleted file mode 100644 index 50c6c80f..00000000 --- a/plugins/kv/drivers/boltdb/plugin_unit_test.go +++ /dev/null @@ -1,531 +0,0 @@ -package boltdb - -import ( - "os" - "strconv" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/stretchr/testify/assert" - bolt "go.etcd.io/bbolt" - "go.uber.org/zap" -) - -// NewBoltClient instantiate new BOLTDB client -// The parameters are: -// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created -// perm os.FileMode -- file permissions, for example 0777 -// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other -// bucket string -- name of the bucket to use, should be UTF-8 -func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) { - const op = errors.Op("boltdb_plugin_new_bolt_client") - db, err := bolt.Open(path, perm, options) - if err != nil { - return nil, errors.E(op, err) - } - - // bucket should be SET - if bucket == "" { - return nil, errors.E(op, errors.Str("bucket should be set")) - } - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - _, err = tx.CreateBucketIfNotExists([]byte(bucket)) - if err != nil { - return errors.E(op, err) - } - return nil - }) - if err != nil { - return nil, errors.E(op, err) - } - - // if TTL is not set, make it default - if ttl == 0 { - ttl = time.Minute - } - - l, _ := zap.NewDevelopment() - s := &Plugin{ - DB: db, - bucket: []byte(bucket), - stop: make(chan struct{}), - timeout: ttl, - gc: &sync.Map{}, - log: logger.NewZapAdapter(l), - } - - // start the TTL gc - go s.startGCLoop() - - return s, nil -} - -func initStorage() kv.Storage { - storage, err := newBoltClient("rr.db", 0777, nil, "rr", time.Second) - if err != nil { - panic(err) - } - return storage -} - -func cleanup(t *testing.T, path string) { - err := os.RemoveAll(path) - if err != nil { - t.Fatal(err) - } -} - -func TestStorage_Has(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) -} - -func TestStorage_Has_Set_Has(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) -} - -func TestConcurrentReadWriteTransactions(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - wg := &sync.WaitGroup{} - wg.Add(3) - - m := &sync.RWMutex{} - // concurrently set the keys - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - // set is writable transaction - // it should stop readable - assert.NoError(t, s.Set(kv.Item{ - Key: "key" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - }, kv.Item{ - Key: "key2" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - })) - m.Unlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.RLock() - v, err = s.Has("key") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - m.RUnlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - err = s.Delete("key" + strconv.Itoa(i)) - assert.NoError(t, err) - m.Unlock() - } - }(s) - - wg.Wait() -} - -func TestStorage_Has_Set_MGet(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestStorage_Has_Set_Get(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world2", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.Get("key") - assert.NoError(t, err) - - if string(res) != "hello world" { - t.Fatal("wrong value by key") - } -} - -func TestStorage_Set_Del_Get(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - // check that keys are present - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) - - assert.NoError(t, s.Delete("key", "key2")) - // check that keys are not present - res, err = s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 0) -} - -func TestStorage_Set_GetM(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestNilAndWrongArgs(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - // check - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - _, err = s.Has("") - assert.Error(t, err) - - _, err = s.Get("") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", "") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", " ") - assert.Error(t, err) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - })) - - assert.Error(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "asdf", - })) - - _, err = s.Has("key") - assert.NoError(t, err) - - assert.Error(t, s.Set(kv.Item{})) - - err = s.Delete("") - assert.Error(t, err) - - err = s.Delete("key", "") - assert.Error(t, err) - - err = s.Delete("key", " ") - assert.Error(t, err) - - err = s.Delete("key") - assert.NoError(t, err) -} - -func TestStorage_MExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - // set timeout to 5 sec - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - i1 := kv.Item{ - Key: "key", - Value: "", - TTL: nowPlusFive, - } - i2 := kv.Item{ - Key: "key2", - Value: "", - TTL: nowPlusFive, - } - assert.NoError(t, s.MExpire(i1, i2)) - - time.Sleep(time.Second * 7) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} - -func TestStorage_SetExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - // set timeout to 5 sec - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: nowPlusFive, - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: nowPlusFive, - })) - - time.Sleep(time.Second * 2) - m, err := s.TTL("key", "key2") - assert.NoError(t, err) - - // remove a precision 4.02342342 -> 4 - keyTTL, err := strconv.Atoi(m["key"].(string)[0:1]) - if err != nil { - t.Fatal(err) - } - - // remove a precision 4.02342342 -> 4 - key2TTL, err := strconv.Atoi(m["key"].(string)[0:1]) - if err != nil { - t.Fatal(err) - } - - assert.True(t, keyTTL < 5) - assert.True(t, key2TTL < 5) - - time.Sleep(time.Second * 7) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go new file mode 100644 index 00000000..17b06fa0 --- /dev/null +++ b/plugins/kv/drivers/memcached/driver.go @@ -0,0 +1,235 @@ +package memcached + +import ( + "strings" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type Driver struct { + client *memcache.Client + log logger.Logger + cfg *Config +} + +// NewMemcachedDriver returns a memcache client using the provided server(s) +// with equal weight. If a server is listed multiple times, +// it gets a proportional amount of weight. +func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) { + const op = errors.Op("new_memcached_driver") + + s := &Driver{ + log: log, + } + + err := cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + s.cfg.InitDefaults() + + m := memcache.New(s.cfg.Addr...) + s.client = m + + return s, nil +} + +// Has checks the key for existence +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool, len(keys)) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + exist, err := d.client.Get(keys[i]) + + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if exist != nil { + m[keys[i]] = true + } + } + return m, nil +} + +// Get gets the item for the given key. ErrCacheMiss is returned for a +// memcache cache miss. The key must be at most 250 bytes in length. +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("memcached_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + data, err := d.client.Get(key) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + return nil, nil + } + return nil, errors.E(op, err) + } + if data != nil { + // return the value by the key + return data.Value, nil + } + // data is nil by some reason and error also nil + return nil, nil +} + +// MGet return map with key -- string +// and map value as value -- []byte +func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("memcached_plugin_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + for i := range keys { + // Here also MultiGet + data, err := d.client.Get(keys[i]) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if data != nil { + m[keys[i]] = data.Value + } + } + + return m, nil +} + +// Set sets the KV pairs. Keys should be 250 bytes maximum +// TTL: +// Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (d *Driver) Set(items ...kv.Item) error { + const op = errors.Op("memcached_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + if items[i] == EmptyItem { + return errors.E(op, errors.EmptyItem) + } + + // pre-allocate item + memcachedItem := &memcache.Item{ + Key: items[i].Key, + // unsafe convert + Value: []byte(items[i].Value), + Flags: 0, + } + + // add additional TTL in case of TTL isn't empty + if items[i].TTL != "" { + // verify the TTL + t, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return err + } + memcachedItem.Expiration = int32(t.Unix()) + } + + err := d.client.Set(memcachedItem) + if err != nil { + return err + } + } + + return nil +} + +// MExpire Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (d *Driver) MExpire(items ...kv.Item) error { + const op = errors.Op("memcached_plugin_mexpire") + for i := range items { + if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // verify provided TTL + t, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return errors.E(op, err) + } + + // Touch updates the expiry for the given key. The seconds parameter is either + // a Unix timestamp or, if seconds is less than 1 month, the number of seconds + // into the future at which time the item will expire. Zero means the item has + // no expiration time. ErrCacheMiss is returned if the key is not in the cache. + // The key must be at most 250 bytes in length. + err = d.client.Touch(items[i].Key, int32(t.Unix())) + if err != nil { + return errors.E(op, err) + } + } + return nil +} + +// TTL return time in seconds (int32) for a given keys +func (d *Driver) TTL(_ ...string) (map[string]interface{}, error) { + const op = errors.Op("memcached_plugin_ttl") + return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) +} + +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + err := d.client.Delete(keys[i]) + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return errors.E(op, err) + } + } + return nil +} diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index dff47cb8..af59e91b 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -1,10 +1,6 @@ package memcached import ( - "strings" - "time" - - "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" @@ -16,23 +12,10 @@ const PluginName = "memcached" var EmptyItem = kv.Item{} type Plugin struct { - // config - cfg *Config + // config plugin cfgPlugin config.Configurer // logger log logger.Logger - // memcached client - client *memcache.Client -} - -// NewMemcachedClient returns a memcache client using the provided server(s) -// with equal weight. If a server is listed multiple times, -// it gets a proportional amount of weight. -func NewMemcachedClient(url ...string) kv.Storage { - m := memcache.New(url...) - return &Plugin{ - client: m, - } } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { @@ -50,213 +33,12 @@ func (s *Plugin) Name() string { return PluginName } -func (s *Plugin) Configure(key string) (kv.Storage, error) { - const op = errors.Op("memcached_plugin_configure") - err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) +func (s *Plugin) Provide(key string) (kv.Storage, error) { + const op = errors.Op("boltdb_plugin_provide") + st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) if err != nil { return nil, errors.E(op, err) } - // initialize default keys - s.cfg.InitDefaults() - - return NewMemcachedClient(s.cfg.Addr...), nil -} - -// Has checks the key for existence -func (s *Plugin) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("memcached_plugin_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - m := make(map[string]bool, len(keys)) - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - exist, err := s.client.Get(keys[i]) - - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return nil, errors.E(op, err) - } - if exist != nil { - m[keys[i]] = true - } - } - return m, nil -} - -// Get gets the item for the given key. ErrCacheMiss is returned for a -// memcache cache miss. The key must be at most 250 bytes in length. -func (s *Plugin) Get(key string) ([]byte, error) { - const op = errors.Op("memcached_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - data, err := s.client.Get(key) - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - return nil, nil - } - return nil, errors.E(op, err) - } - if data != nil { - // return the value by the key - return data.Value, nil - } - // data is nil by some reason and error also nil - return nil, nil -} - -// MGet return map with key -- string -// and map value as value -- []byte -func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("memcached_plugin_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]interface{}, len(keys)) - for i := range keys { - // Here also MultiGet - data, err := s.client.Get(keys[i]) - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return nil, errors.E(op, err) - } - if data != nil { - m[keys[i]] = data.Value - } - } - - return m, nil -} - -// Set sets the KV pairs. Keys should be 250 bytes maximum -// TTL: -// Expiration is the cache expiration time, in seconds: either a relative -// time from now (up to 1 month), or an absolute Unix epoch time. -// Zero means the Item has no expiration time. -func (s *Plugin) Set(items ...kv.Item) error { - const op = errors.Op("memcached_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - for i := range items { - if items[i] == EmptyItem { - return errors.E(op, errors.EmptyItem) - } - - // pre-allocate item - memcachedItem := &memcache.Item{ - Key: items[i].Key, - // unsafe convert - Value: []byte(items[i].Value), - Flags: 0, - } - - // add additional TTL in case of TTL isn't empty - if items[i].TTL != "" { - // verify the TTL - t, err := time.Parse(time.RFC3339, items[i].TTL) - if err != nil { - return err - } - memcachedItem.Expiration = int32(t.Unix()) - } - - err := s.client.Set(memcachedItem) - if err != nil { - return err - } - } - - return nil -} - -// MExpire Expiration is the cache expiration time, in seconds: either a relative -// time from now (up to 1 month), or an absolute Unix epoch time. -// Zero means the Item has no expiration time. -func (s *Plugin) MExpire(items ...kv.Item) error { - const op = errors.Op("memcached_plugin_mexpire") - for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // verify provided TTL - t, err := time.Parse(time.RFC3339, items[i].TTL) - if err != nil { - return errors.E(op, err) - } - - // Touch updates the expiry for the given key. The seconds parameter is either - // a Unix timestamp or, if seconds is less than 1 month, the number of seconds - // into the future at which time the item will expire. Zero means the item has - // no expiration time. ErrCacheMiss is returned if the key is not in the cache. - // The key must be at most 250 bytes in length. - err = s.client.Touch(items[i].Key, int32(t.Unix())) - if err != nil { - return errors.E(op, err) - } - } - return nil -} - -// TTL return time in seconds (int32) for a given keys -func (s *Plugin) TTL(_ ...string) (map[string]interface{}, error) { - const op = errors.Op("memcached_plugin_ttl") - return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) -} - -func (s *Plugin) Delete(keys ...string) error { - const op = errors.Op("memcached_plugin_has") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - for i := range keys { - err := s.client.Delete(keys[i]) - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return errors.E(op, err) - } - } - return nil -} - -func (s *Plugin) Close() error { - return nil + return st, nil } diff --git a/plugins/kv/drivers/memcached/plugin_unit_test.go b/plugins/kv/drivers/memcached/plugin_unit_test.go deleted file mode 100644 index 31423627..00000000 --- a/plugins/kv/drivers/memcached/plugin_unit_test.go +++ /dev/null @@ -1,432 +0,0 @@ -package memcached - -import ( - "strconv" - "sync" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/stretchr/testify/assert" -) - -func initStorage() kv.Storage { - return NewMemcachedClient("localhost:11211") -} - -func cleanup(t *testing.T, s kv.Storage, keys ...string) { - err := s.Delete(keys...) - if err != nil { - t.Fatalf("error during cleanup: %s", err.Error()) - } -} - -func TestStorage_Has(t *testing.T) { - s := initStorage() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) -} - -func TestStorage_Has_Set_Has(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) -} - -func TestStorage_Has_Set_MGet(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestStorage_Has_Set_Get(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.Get("key") - assert.NoError(t, err) - - if string(res) != "hello world" { - t.Fatal("wrong value by key") - } -} - -func TestStorage_Set_Del_Get(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - // check that keys are present - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) - - assert.NoError(t, s.Delete("key", "key2")) - // check that keys are not present - res, err = s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 0) -} - -func TestStorage_Set_GetM(t *testing.T) { - s := initStorage() - - defer func() { - cleanup(t, s, "key", "key2") - - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestStorage_MExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - // set timeout to 5 sec - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - i1 := kv.Item{ - Key: "key", - Value: "", - TTL: nowPlusFive, - } - i2 := kv.Item{ - Key: "key2", - Value: "", - TTL: nowPlusFive, - } - assert.NoError(t, s.MExpire(i1, i2)) - - time.Sleep(time.Second * 7) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} - -func TestNilAndWrongArgs(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key") - if err := s.Close(); err != nil { - panic(err) - } - }() - - // check - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - _, err = s.Has("") - assert.Error(t, err) - - _, err = s.Get("") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", "") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", " ") - assert.Error(t, err) - - assert.Error(t, s.Set(kv.Item{})) - - err = s.Delete("") - assert.Error(t, err) - - err = s.Delete("key", "") - assert.Error(t, err) - - err = s.Delete("key", " ") - assert.Error(t, err) - - err = s.Delete("key") - assert.NoError(t, err) -} - -func TestStorage_SetExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - // set timeout to 5 sec - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: nowPlusFive, - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: nowPlusFive, - })) - - time.Sleep(time.Second * 7) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} - -func TestConcurrentReadWriteTransactions(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - wg := &sync.WaitGroup{} - wg.Add(3) - - m := &sync.RWMutex{} - // concurrently set the keys - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - // set is writable transaction - // it should stop readable - assert.NoError(t, s.Set(kv.Item{ - Key: "key" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - }, kv.Item{ - Key: "key2" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - })) - m.Unlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.RLock() - v, err = s.Has("key") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - m.RUnlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - err = s.Delete("key" + strconv.Itoa(i)) - assert.NoError(t, err) - m.Unlock() - } - }(s) - - wg.Wait() -} diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go new file mode 100644 index 00000000..1e0d03d4 --- /dev/null +++ b/plugins/kv/drivers/memory/driver.go @@ -0,0 +1,229 @@ +package memory + +import ( + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type Driver struct { + heap sync.Map + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} + log logger.Logger + cfg *Config +} + +func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) { + const op = errors.Op("new_in_memory_driver") + + d := &Driver{ + stop: stop, + log: log, + } + + err := cfgPlugin.UnmarshalKey(key, &d.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + d.cfg.InitDefaults() + + go d.gc() + + return d, nil +} + +func (s *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("in_memory_plugin_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + if _, ok := s.heap.Load(keys[i]); ok { + m[keys[i]] = true + } + } + + return m, nil +} + +func (s *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("in_memory_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + if data, exist := s.heap.Load(key); exist { + // here might be a panic + // but data only could be a string, see Set function + return []byte(data.(kv.Item).Value), nil + } + return nil, nil +} + +func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("in_memory_plugin_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + for i := range keys { + if value, ok := s.heap.Load(keys[i]); ok { + m[keys[i]] = value.(kv.Item).Value + } + } + + return m, nil +} + +func (s *Driver) Set(items ...kv.Item) error { + const op = errors.Op("in_memory_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + // TTL is set + if items[i].TTL != "" { + // check the TTL in the item + _, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return err + } + } + + s.heap.Store(items[i].Key, items[i]) + } + return nil +} + +// MExpire sets the expiration time to the key +// If key already has the expiration time, it will be overwritten +func (s *Driver) MExpire(items ...kv.Item) error { + const op = errors.Op("in_memory_plugin_mexpire") + for i := range items { + if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // if key exist, overwrite it value + if pItem, ok := s.heap.Load(items[i].Key); ok { + // check that time is correct + _, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return errors.E(op, err) + } + tmp := pItem.(kv.Item) + // guess that t is in the future + // in memory is just FOR TESTING PURPOSES + // LOGIC ISN'T IDEAL + s.heap.Store(items[i].Key, kv.Item{ + Key: items[i].Key, + Value: tmp.Value, + TTL: items[i].TTL, + }) + } + } + + return nil +} + +func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("in_memory_plugin_ttl") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + for i := range keys { + if item, ok := s.heap.Load(keys[i]); ok { + m[keys[i]] = item.(kv.Item).TTL + } + } + return m, nil +} + +func (s *Driver) Delete(keys ...string) error { + const op = errors.Op("in_memory_plugin_delete") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + s.heap.Delete(keys[i]) + } + return nil +} + +// ================================== PRIVATE ====================================== + +func (s *Driver) gc() { + ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) + for { + select { + case <-s.stop: + ticker.Stop() + return + case now := <-ticker.C: + // check every second + s.heap.Range(func(key, value interface{}) bool { + v := value.(kv.Item) + if v.TTL == "" { + return true + } + + t, err := time.Parse(time.RFC3339, v.TTL) + if err != nil { + return false + } + + if now.After(t) { + s.log.Debug("key deleted", "key", key) + s.heap.Delete(key) + } + return true + }) + } + } +} diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go index 91fe89d3..acc6023d 100644 --- a/plugins/kv/drivers/memory/plugin.go +++ b/plugins/kv/drivers/memory/plugin.go @@ -1,10 +1,6 @@ package memory import ( - "strings" - "sync" - "time" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" @@ -16,12 +12,11 @@ const PluginName = "memory" type Plugin struct { // heap is user map for the key-value pairs - heap sync.Map stop chan struct{} log logger.Logger - cfg *Config cfgPlugin config.Configurer + drivers uint } func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { @@ -37,231 +32,33 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { } func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - return errCh + return make(chan error, 1) } func (s *Plugin) Stop() error { - const op = errors.Op("in_memory_plugin_stop") - err := s.Close() - if err != nil { - return errors.E(op, err) + if s.drivers > 0 { + for i := uint(0); i < s.drivers; i++ { + // send close signal to every driver + s.stop <- struct{}{} + } } return nil } -func (s *Plugin) Configure(key string) (kv.Storage, error) { - const op = errors.Op("inmemory_plugin_configure") - err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) +func (s *Plugin) Provide(key string) (kv.Storage, error) { + const op = errors.Op("inmemory_plugin_provide") + st, err := NewInMemoryDriver(s.log, key, s.cfgPlugin, s.stop) if err != nil { return nil, errors.E(op, err) } - // initialize default keys - s.cfg.InitDefaults() + // save driver number to release resources after Stop + s.drivers++ - // start in-memory gc for kv - go s.gc() - - return s, nil -} - -func (s *Plugin) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("in_memory_plugin_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - m := make(map[string]bool) - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if _, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = true - } - } - - return m, nil -} - -func (s *Plugin) Get(key string) ([]byte, error) { - const op = errors.Op("in_memory_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if data, exist := s.heap.Load(key); exist { - // here might be a panic - // but data only could be a string, see Set function - return []byte(data.(kv.Item).Value), nil - } - return nil, nil -} - -func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("in_memory_plugin_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]interface{}, len(keys)) - - for i := range keys { - if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(kv.Item).Value - } - } - - return m, nil -} - -func (s *Plugin) Set(items ...kv.Item) error { - const op = errors.Op("in_memory_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - for i := range items { - // TTL is set - if items[i].TTL != "" { - // check the TTL in the item - _, err := time.Parse(time.RFC3339, items[i].TTL) - if err != nil { - return err - } - } - - s.heap.Store(items[i].Key, items[i]) - } - return nil -} - -// MExpire sets the expiration time to the key -// If key already has the expiration time, it will be overwritten -func (s *Plugin) MExpire(items ...kv.Item) error { - const op = errors.Op("in_memory_plugin_mexpire") - for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // if key exist, overwrite it value - if pItem, ok := s.heap.Load(items[i].Key); ok { - // check that time is correct - _, err := time.Parse(time.RFC3339, items[i].TTL) - if err != nil { - return errors.E(op, err) - } - tmp := pItem.(kv.Item) - // guess that t is in the future - // in memory is just FOR TESTING PURPOSES - // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, kv.Item{ - Key: items[i].Key, - Value: tmp.Value, - TTL: items[i].TTL, - }) - } - } - - return nil -} - -func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("in_memory_plugin_ttl") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]interface{}, len(keys)) - - for i := range keys { - if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(kv.Item).TTL - } - } - return m, nil -} - -func (s *Plugin) Delete(keys ...string) error { - const op = errors.Op("in_memory_plugin_delete") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - for i := range keys { - s.heap.Delete(keys[i]) - } - return nil -} - -// Close clears the in-memory storage -func (s *Plugin) Close() error { - s.stop <- struct{}{} - return nil + return st, nil } // Name returns plugin user-friendly name func (s *Plugin) Name() string { return PluginName } - -// ================================== PRIVATE ====================================== - -func (s *Plugin) gc() { - ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) - for { - select { - case <-s.stop: - ticker.Stop() - return - case now := <-ticker.C: - // check every second - s.heap.Range(func(key, value interface{}) bool { - v := value.(kv.Item) - if v.TTL == "" { - return true - } - - t, err := time.Parse(time.RFC3339, v.TTL) - if err != nil { - return false - } - - if now.After(t) { - s.log.Debug("key deleted", "key", key) - s.heap.Delete(key) - } - return true - }) - } - } -} diff --git a/plugins/kv/drivers/memory/plugin_unit_test.go b/plugins/kv/drivers/memory/plugin_unit_test.go deleted file mode 100644 index 1965a696..00000000 --- a/plugins/kv/drivers/memory/plugin_unit_test.go +++ /dev/null @@ -1,472 +0,0 @@ -package memory - -import ( - "strconv" - "sync" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func initStorage() kv.Storage { - p := &Plugin{ - stop: make(chan struct{}), - } - p.cfg = &Config{ - Interval: 1, - } - - l, _ := zap.NewDevelopment() - p.log = logger.NewZapAdapter(l) - - go p.gc() - - return p -} - -func cleanup(t *testing.T, s kv.Storage, keys ...string) { - err := s.Delete(keys...) - if err != nil { - t.Fatalf("error during cleanup: %s", err.Error()) - } -} - -func TestStorage_Has(t *testing.T) { - s := initStorage() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) -} - -func TestStorage_Has_Set_Has(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) -} - -func TestStorage_Has_Set_MGet(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestStorage_Has_Set_Get(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.Get("key") - assert.NoError(t, err) - - if string(res) != "value" { - t.Fatal("wrong value by key") - } -} - -func TestStorage_Set_Del_Get(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - // check that keys are present - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) - - assert.NoError(t, s.Delete("key", "key2")) - // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm - res, err = s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 0) -} - -func TestStorage_Set_GetM(t *testing.T) { - s := initStorage() - - defer func() { - cleanup(t, s, "key", "key2") - - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: "", - })) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestStorage_MExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - // set timeout to 5 sec - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - i1 := kv.Item{ - Key: "key", - Value: "", - TTL: nowPlusFive, - } - i2 := kv.Item{ - Key: "key2", - Value: "", - TTL: nowPlusFive, - } - assert.NoError(t, s.MExpire(i1, i2)) - - time.Sleep(time.Second * 7) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} - -func TestNilAndWrongArgs(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - }() - - // check - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - _, err = s.Has("") - assert.Error(t, err) - - _, err = s.Get("") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", "") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", " ") - assert.Error(t, err) - - assert.NoError(t, s.Set(kv.Item{})) - _, err = s.Has("key") - assert.NoError(t, err) - - err = s.Delete("") - assert.Error(t, err) - - err = s.Delete("key", "") - assert.Error(t, err) - - err = s.Delete("key", " ") - assert.Error(t, err) - - err = s.Delete("key") - assert.NoError(t, err) -} - -func TestStorage_SetExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - // set timeout to 5 sec - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: nowPlusFive, - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: nowPlusFive, - })) - - time.Sleep(time.Second * 2) - m, err := s.TTL("key", "key2") - assert.NoError(t, err) - - // remove a precision 4.02342342 -> 4 - keyTTL, err := strconv.Atoi(m["key"].(string)[0:1]) - if err != nil { - t.Fatal(err) - } - - // remove a precision 4.02342342 -> 4 - key2TTL, err := strconv.Atoi(m["key"].(string)[0:1]) - if err != nil { - t.Fatal(err) - } - - assert.True(t, keyTTL < 5) - assert.True(t, key2TTL < 5) - - time.Sleep(time.Second * 4) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} - -func TestConcurrentReadWriteTransactions(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - wg := &sync.WaitGroup{} - wg.Add(3) - - m := &sync.RWMutex{} - // concurrently set the keys - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - // set is writable transaction - // it should stop readable - assert.NoError(t, s.Set(kv.Item{ - Key: "key" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - }, kv.Item{ - Key: "key2" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - })) - m.Unlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.RLock() - v, err = s.Has("key") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - m.RUnlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - err = s.Delete("key" + strconv.Itoa(i)) - assert.NoError(t, err) - m.Unlock() - } - }(s) - - wg.Wait() -} diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 6c2a66f2..20dbb8b3 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -35,11 +35,15 @@ type Storage interface { // Delete one or multiple keys. Delete(keys ...string) error +} - // Close closes the storage and underlying resources. - Close() error +// StorageDriver interface provide storage +type StorageDriver interface { + Provider +} - // Configure used to configure storage - // key - yaml config key, for example kv.boltdb-north - Configure(key string) (Storage, error) +// Provider provides storage based on the config +type Provider interface { + // Provide provides Storage based on the config key + Provide(key string) (Storage, error) } diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index cc3875e0..4947dbe3 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -216,24 +216,6 @@ func (r *rpc) Delete(in []byte, ok *bool) error { return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// Close closes the storage connection -func (r *rpc) Close(storage string, ok *bool) error { - const op = errors.Op("rpc_close") - if st, exists := r.storages[storage]; exists { - err := st.Close() - if err != nil { - return errors.E(op, err) - } - - // save the result - *ok = true - return nil - } - - *ok = false - return nil -} - func strConvert(s []byte) string { return *(*string)(unsafe.Pointer(&s)) } diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go index b779665b..42e0ef3e 100644 --- a/plugins/kv/storage.go +++ b/plugins/kv/storage.go @@ -25,11 +25,12 @@ const ( type Plugin struct { log logger.Logger // drivers contains general storage drivers, such as boltdb, memory, memcached, redis. - drivers map[string]Storage + drivers map[string]StorageDriver // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. storages map[string]Storage // KV configuration - cfg Config + cfg Config + cfgPlugin config.Configurer } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { @@ -42,9 +43,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if err != nil { return errors.E(op, err) } - p.drivers = make(map[string]Storage, 5) + p.drivers = make(map[string]StorageDriver, 5) p.storages = make(map[string]Storage, 5) p.log = log + p.cfgPlugin = cfg return nil } @@ -97,7 +99,8 @@ func (p *Plugin) Serve() chan error { if _, ok := p.drivers[memcached]; !ok { continue } - storage, err := p.drivers[memcached].Configure(configKey) + + storage, err := p.drivers[memcached].Provide(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -105,11 +108,13 @@ func (p *Plugin) Serve() chan error { // save the storage p.storages[k] = storage + case boltdb: if _, ok := p.drivers[boltdb]; !ok { continue } - storage, err := p.drivers[boltdb].Configure(configKey) + + storage, err := p.drivers[boltdb].Provide(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -121,19 +126,8 @@ func (p *Plugin) Serve() chan error { if _, ok := p.drivers[memory]; !ok { continue } - storage, err := p.drivers[memory].Configure(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - // save the storage - p.storages[k] = storage - case redis: - if _, ok := p.drivers[redis]; !ok { - continue - } - storage, err := p.drivers[redis].Configure(configKey) + storage, err := p.drivers[memory].Provide(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -141,6 +135,19 @@ func (p *Plugin) Serve() chan error { // save the storage p.storages[k] = storage + + case redis: + // if _, ok := p.drivers[redis]; !ok { + // continue + // } + // storage, err := p.drivers[redis].Configure(configKey) + // if err != nil { + // errCh <- errors.E(op, err) + // return errCh + // } + // + // // save the storage + // p.storages[k] = storage default: errCh <- errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])) } @@ -160,7 +167,7 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage Storage) { +func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) { // save the storage driver p.drivers[name.Name()] = storage } |