summaryrefslogtreecommitdiff
path: root/plugins/kv/drivers/boltdb
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv/drivers/boltdb')
-rw-r--r--plugins/kv/drivers/boltdb/driver.go420
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go432
-rw-r--r--plugins/kv/drivers/boltdb/plugin_unit_test.go531
3 files changed, 433 insertions, 950 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
new file mode 100644
index 00000000..8c3bb68f
--- /dev/null
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -0,0 +1,420 @@
+package boltdb
+
+import (
+ "bytes"
+ "encoding/gob"
+ "os"
+ "path"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ bolt "go.etcd.io/bbolt"
+)
+
+type Driver struct {
+ // db instance
+ DB *bolt.DB
+ // name should be UTF-8
+ bucket []byte
+ log logger.Logger
+ cfg *Config
+ // gc contains key which are contain timeouts
+ gc sync.Map
+ // default timeout for cache cleanup is 1 minute
+ timeout time.Duration
+
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+}
+
+func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) {
+ const op = errors.Op("new_boltdb_driver")
+
+ d := &Driver{
+ log: log,
+ stop: stop,
+ }
+
+ err := cfgPlugin.UnmarshalKey(key, &d.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.bucket = []byte(d.cfg.Bucket)
+ d.timeout = time.Duration(d.cfg.Interval) * time.Second
+ d.gc = sync.Map{}
+
+ // add default values
+ d.cfg.InitDefaults()
+
+ db, err := bolt.Open(path.Join(d.cfg.Dir, d.cfg.File), os.FileMode(d.cfg.Permissions), nil)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.DB = db
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists([]byte(d.cfg.Bucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ return nil
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ go d.startGCLoop()
+
+ return d, nil
+}
+
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("boltdb_plugin_has")
+ d.log.Debug("boltdb HAS method called", "args", keys)
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ m := make(map[string]bool, len(keys))
+
+ // this is readable transaction
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ // Get retrieves the value for a key in the bucket.
+ // Returns a nil value if the key does not exist or if the key is a nested bucket.
+ // The returned value is only valid for the life of the transaction.
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ exist := b.Get([]byte(keys[i]))
+ if exist != nil {
+ m[keys[i]] = true
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.log.Debug("boltdb HAS method finished")
+ return m, nil
+}
+
+// Get retrieves the value for a key in the bucket.
+// Returns a nil value if the key does not exist or if the key is a nested bucket.
+// The returned value is only valid for the life of the transaction.
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("boltdb_plugin_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ var val []byte
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ val = b.Get([]byte(key))
+
+ // try to decode values
+ if val != nil {
+ buf := bytes.NewReader(val)
+ decoder := gob.NewDecoder(buf)
+
+ var i string
+ err := decoder.Decode(&i)
+ if err != nil {
+ // unsafe (w/o runes) convert
+ return errors.E(op, err)
+ }
+
+ // set the value
+ val = []byte(i)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return val, nil
+}
+
+func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("boltdb_plugin_mget")
+ // defense
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+
+ buf := new(bytes.Buffer)
+ var out string
+ buf.Grow(100)
+ for i := range keys {
+ value := b.Get([]byte(keys[i]))
+ buf.Write(value)
+ // allocate enough space
+ dec := gob.NewDecoder(buf)
+ if value != nil {
+ err := dec.Decode(&out)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ m[keys[i]] = out
+ buf.Reset()
+ out = ""
+ }
+ }
+
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return m, nil
+}
+
+// Set puts the K/V to the bolt
+func (d *Driver) Set(items ...kv.Item) error {
+ const op = errors.Op("boltdb_plugin_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // start writable transaction
+ tx, err := d.DB.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ defer func() {
+ err = tx.Commit()
+ if err != nil {
+ errRb := tx.Rollback()
+ if errRb != nil {
+ d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ }
+ }
+ }()
+
+ b := tx.Bucket(d.bucket)
+ // use access by index to avoid copying
+ for i := range items {
+ // performance note: pass a prepared bytes slice with initial cap
+ // we can't move buf and gob out of loop, because we need to clear both from data
+ // but gob will contain (w/o re-init) the past data
+ buf := bytes.Buffer{}
+ encoder := gob.NewEncoder(&buf)
+ if errors.Is(errors.EmptyItem, err) {
+ return errors.E(op, errors.EmptyItem)
+ }
+
+ // Encode value
+ err = encoder.Encode(&items[i].Value)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ // buf.Bytes will copy the underlying slice. Take a look in case of performance problems
+ err = b.Put([]byte(items[i].Key), buf.Bytes())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
+ // we do not need mutex here, since we use sync.Map
+ if items[i].TTL != "" {
+ // check correctness of provided TTL
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ // Store key TTL in the separate map
+ d.gc.Store(items[i].Key, items[i].TTL)
+ }
+
+ buf.Reset()
+ }
+
+ return nil
+}
+
+// Delete all keys from DB
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("boltdb_plugin_delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ // start writable transaction
+ tx, err := d.DB.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ defer func() {
+ err = tx.Commit()
+ if err != nil {
+ errRb := tx.Rollback()
+ if errRb != nil {
+ d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ }
+ }
+ }()
+
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+
+ for _, key := range keys {
+ err = b.Delete([]byte(key))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
+
+// MExpire sets the expiration time to the key
+// If key already has the expiration time, it will be overwritten
+func (d *Driver) MExpire(items ...kv.Item) error {
+ const op = errors.Op("boltdb_plugin_mexpire")
+ for i := range items {
+ if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // verify provided TTL
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ d.gc.Store(items[i].Key, items[i].TTL)
+ }
+ return nil
+}
+
+func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("boltdb_plugin_ttl")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for i := range keys {
+ if item, ok := d.gc.Load(keys[i]); ok {
+ // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64
+ m[keys[i]] = item.(string)
+ }
+ }
+ return m, nil
+}
+
+// ========================= PRIVATE =================================
+
+func (d *Driver) startGCLoop() { //nolint:gocognit
+ go func() {
+ t := time.NewTicker(d.timeout)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ // calculate current time before loop started to be fair
+ now := time.Now()
+ d.gc.Range(func(key, value interface{}) bool {
+ const op = errors.Op("boltdb_plugin_gc")
+ k := key.(string)
+ v, err := time.Parse(time.RFC3339, value.(string))
+ if err != nil {
+ return false
+ }
+
+ if now.After(v) {
+ // time expired
+ d.gc.Delete(k)
+ d.log.Debug("key deleted", "key", k)
+ err := d.DB.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ err := b.Delete([]byte(k))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ })
+ if err != nil {
+ d.log.Error("error during the gc phase of update", "error", err)
+ // todo this error is ignored, it means, that timer still be active
+ // to prevent this, we need to invoke t.Stop()
+ return false
+ }
+ }
+ return true
+ })
+ case <-d.stop:
+ err := d.DB.Close()
+ if err != nil {
+ d.log.Error("error")
+ }
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index 98e2bf22..9d1e0dba 100644
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -1,45 +1,23 @@
package boltdb
import (
- "bytes"
- "encoding/gob"
- "os"
- "path"
- "strings"
- "sync"
- "time"
-
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
- bolt "go.etcd.io/bbolt"
)
const PluginName = "boltdb"
// Plugin BoltDB K/V storage.
type Plugin struct {
- sync.Mutex
- // db instance
- DB *bolt.DB
- // name should be UTF-8
- bucket []byte
-
- // boltdb configuration
- cfg *Config
cfgPlugin config.Configurer
-
// logger
log logger.Logger
-
- // gc contains key which are contain timeouts
- gc *sync.Map
- // default timeout for cache cleanup is 1 minute
- timeout time.Duration
-
// stop is used to stop keys GC and close boltdb connection
stop chan struct{}
+
+ drivers uint
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
@@ -47,6 +25,7 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
return errors.E(errors.Disabled)
}
+ s.stop = make(chan struct{})
s.log = log
s.cfgPlugin = cfg
return nil
@@ -54,418 +33,33 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
// Serve is noop here
func (s *Plugin) Serve() chan error {
- s.Lock()
- defer s.Unlock()
return make(chan error, 1)
}
func (s *Plugin) Stop() error {
- s.Lock()
- defer s.Unlock()
- const op = errors.Op("boltdb_plugin_stop")
- if s.DB != nil {
- err := s.Close()
- if err != nil {
- return errors.E(op, err)
+ if s.drivers > 0 {
+ for i := uint(0); i < s.drivers; i++ {
+ // send close signal to every driver
+ s.stop <- struct{}{}
}
}
return nil
}
-func (s *Plugin) Configure(key string) (kv.Storage, error) {
- const op = errors.Op("boltdb_plugin_configure")
- s.Lock()
- defer s.Unlock()
-
- err := s.cfgPlugin.UnmarshalKey(key, &s.cfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- s.bucket = []byte(s.cfg.Bucket)
- s.stop = make(chan struct{}, 1)
- s.timeout = time.Duration(s.cfg.Interval) * time.Second
- s.gc = &sync.Map{}
-
- // add default values
- s.cfg.InitDefaults()
-
- db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- s.DB = db
-
- // create bucket if it does not exist
- // tx.Commit invokes via the db.Update
- err = db.Update(func(tx *bolt.Tx) error {
- const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists([]byte(s.cfg.Bucket))
- if err != nil {
- return errors.E(op, upOp)
- }
- return nil
- })
-
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // start the GC phase
- go s.startGCLoop()
-
- return s, nil
-}
-
-func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
- const op = errors.Op("boltdb_plugin_has")
- s.log.Debug("boltdb HAS method called", "args", keys)
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- m := make(map[string]bool, len(keys))
-
- // this is readable transaction
- err := s.DB.View(func(tx *bolt.Tx) error {
- // Get retrieves the value for a key in the bucket.
- // Returns a nil value if the key does not exist or if the key is a nested bucket.
- // The returned value is only valid for the life of the transaction.
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return errors.E(op, errors.EmptyKey)
- }
- b := tx.Bucket(s.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
- exist := b.Get([]byte(keys[i]))
- if exist != nil {
- m[keys[i]] = true
- }
- }
- return nil
- })
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_provide")
+ st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
if err != nil {
return nil, errors.E(op, err)
}
- s.log.Debug("boltdb HAS method finished")
- return m, nil
-}
-
-// Get retrieves the value for a key in the bucket.
-// Returns a nil value if the key does not exist or if the key is a nested bucket.
-// The returned value is only valid for the life of the transaction.
-func (s *Plugin) Get(key string) ([]byte, error) {
- const op = errors.Op("boltdb_plugin_get")
- // to get cases like " "
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
-
- var val []byte
- err := s.DB.View(func(tx *bolt.Tx) error {
- b := tx.Bucket(s.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
- val = b.Get([]byte(key))
-
- // try to decode values
- if val != nil {
- buf := bytes.NewReader(val)
- decoder := gob.NewDecoder(buf)
-
- var i string
- err := decoder.Decode(&i)
- if err != nil {
- // unsafe (w/o runes) convert
- return errors.E(op, err)
- }
-
- // set the value
- val = []byte(i)
- }
- return nil
- })
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return val, nil
-}
-
-func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("boltdb_plugin_mget")
- // defense
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- }
-
- m := make(map[string]interface{}, len(keys))
-
- err := s.DB.View(func(tx *bolt.Tx) error {
- b := tx.Bucket(s.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
-
- buf := new(bytes.Buffer)
- var out string
- buf.Grow(100)
- for i := range keys {
- value := b.Get([]byte(keys[i]))
- buf.Write(value)
- // allocate enough space
- dec := gob.NewDecoder(buf)
- if value != nil {
- err := dec.Decode(&out)
- if err != nil {
- return errors.E(op, err)
- }
- m[keys[i]] = out
- buf.Reset()
- out = ""
- }
- }
-
- return nil
- })
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return m, nil
-}
-
-// Set puts the K/V to the bolt
-func (s *Plugin) Set(items ...kv.Item) error {
- const op = errors.Op("boltdb_plugin_set")
- if items == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- // start writable transaction
- tx, err := s.DB.Begin(true)
- if err != nil {
- return errors.E(op, err)
- }
- defer func() {
- err = tx.Commit()
- if err != nil {
- errRb := tx.Rollback()
- if errRb != nil {
- s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
- }
- }
- }()
-
- b := tx.Bucket(s.bucket)
- // use access by index to avoid copying
- for i := range items {
- // performance note: pass a prepared bytes slice with initial cap
- // we can't move buf and gob out of loop, because we need to clear both from data
- // but gob will contain (w/o re-init) the past data
- buf := bytes.Buffer{}
- encoder := gob.NewEncoder(&buf)
- if errors.Is(errors.EmptyItem, err) {
- return errors.E(op, errors.EmptyItem)
- }
-
- // Encode value
- err = encoder.Encode(&items[i].Value)
- if err != nil {
- return errors.E(op, err)
- }
- // buf.Bytes will copy the underlying slice. Take a look in case of performance problems
- err = b.Put([]byte(items[i].Key), buf.Bytes())
- if err != nil {
- return errors.E(op, err)
- }
-
- // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
- // we do not need mutex here, since we use sync.Map
- if items[i].TTL != "" {
- // check correctness of provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
- if err != nil {
- return errors.E(op, err)
- }
- // Store key TTL in the separate map
- s.gc.Store(items[i].Key, items[i].TTL)
- }
-
- buf.Reset()
- }
+ // save driver number to release resources after Stop
+ s.drivers++
- return nil
-}
-
-// Delete all keys from DB
-func (s *Plugin) Delete(keys ...string) error {
- const op = errors.Op("boltdb_plugin_delete")
- if keys == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return errors.E(op, errors.EmptyKey)
- }
- }
-
- // start writable transaction
- tx, err := s.DB.Begin(true)
- if err != nil {
- return errors.E(op, err)
- }
-
- defer func() {
- err = tx.Commit()
- if err != nil {
- errRb := tx.Rollback()
- if errRb != nil {
- s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
- }
- }
- }()
-
- b := tx.Bucket(s.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
-
- for _, key := range keys {
- err = b.Delete([]byte(key))
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- return nil
-}
-
-// MExpire sets the expiration time to the key
-// If key already has the expiration time, it will be overwritten
-func (s *Plugin) MExpire(items ...kv.Item) error {
- const op = errors.Op("boltdb_plugin_mexpire")
- for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
- return errors.E(op, errors.Str("should set timeout and at least one key"))
- }
-
- // verify provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
- if err != nil {
- return errors.E(op, err)
- }
-
- s.gc.Store(items[i].Key, items[i].TTL)
- }
- return nil
-}
-
-func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("boltdb_plugin_ttl")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- }
-
- m := make(map[string]interface{}, len(keys))
-
- for i := range keys {
- if item, ok := s.gc.Load(keys[i]); ok {
- // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64
- m[keys[i]] = item.(string)
- }
- }
- return m, nil
-}
-
-// Close the DB connection
-func (s *Plugin) Close() error {
- // stop the keys GC
- s.stop <- struct{}{}
-
- return s.DB.Close()
+ return st, nil
}
// Name returns plugin name
func (s *Plugin) Name() string {
return PluginName
}
-
-// ========================= PRIVATE =================================
-
-func (s *Plugin) startGCLoop() { //nolint:gocognit
- s.Lock()
- defer s.Unlock()
-
- go func() {
- t := time.NewTicker(s.timeout)
- defer t.Stop()
- for {
- select {
- case <-t.C:
- // calculate current time before loop started to be fair
- now := time.Now()
- s.gc.Range(func(key, value interface{}) bool {
- const op = errors.Op("boltdb_plugin_gc")
- k := key.(string)
- v, err := time.Parse(time.RFC3339, value.(string))
- if err != nil {
- return false
- }
-
- if now.After(v) {
- // time expired
- s.gc.Delete(k)
- s.log.Debug("key deleted", "key", k)
- err := s.DB.Update(func(tx *bolt.Tx) error {
- b := tx.Bucket(s.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
- err := b.Delete([]byte(k))
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- })
- if err != nil {
- s.log.Error("error during the gc phase of update", "error", err)
- // todo this error is ignored, it means, that timer still be active
- // to prevent this, we need to invoke t.Stop()
- return false
- }
- }
- return true
- })
- case <-s.stop:
- return
- }
- }
- }()
-}
diff --git a/plugins/kv/drivers/boltdb/plugin_unit_test.go b/plugins/kv/drivers/boltdb/plugin_unit_test.go
deleted file mode 100644
index 50c6c80f..00000000
--- a/plugins/kv/drivers/boltdb/plugin_unit_test.go
+++ /dev/null
@@ -1,531 +0,0 @@
-package boltdb
-
-import (
- "os"
- "strconv"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/stretchr/testify/assert"
- bolt "go.etcd.io/bbolt"
- "go.uber.org/zap"
-)
-
-// NewBoltClient instantiate new BOLTDB client
-// The parameters are:
-// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created
-// perm os.FileMode -- file permissions, for example 0777
-// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other
-// bucket string -- name of the bucket to use, should be UTF-8
-func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) {
- const op = errors.Op("boltdb_plugin_new_bolt_client")
- db, err := bolt.Open(path, perm, options)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // bucket should be SET
- if bucket == "" {
- return nil, errors.E(op, errors.Str("bucket should be set"))
- }
-
- // create bucket if it does not exist
- // tx.Commit invokes via the db.Update
- err = db.Update(func(tx *bolt.Tx) error {
- _, err = tx.CreateBucketIfNotExists([]byte(bucket))
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- })
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // if TTL is not set, make it default
- if ttl == 0 {
- ttl = time.Minute
- }
-
- l, _ := zap.NewDevelopment()
- s := &Plugin{
- DB: db,
- bucket: []byte(bucket),
- stop: make(chan struct{}),
- timeout: ttl,
- gc: &sync.Map{},
- log: logger.NewZapAdapter(l),
- }
-
- // start the TTL gc
- go s.startGCLoop()
-
- return s, nil
-}
-
-func initStorage() kv.Storage {
- storage, err := newBoltClient("rr.db", 0777, nil, "rr", time.Second)
- if err != nil {
- panic(err)
- }
- return storage
-}
-
-func cleanup(t *testing.T, path string) {
- err := os.RemoveAll(path)
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func TestStorage_Has(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-}
-
-func TestStorage_Has_Set_Has(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-}
-
-func TestConcurrentReadWriteTransactions(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- wg := &sync.WaitGroup{}
- wg.Add(3)
-
- m := &sync.RWMutex{}
- // concurrently set the keys
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- // set is writable transaction
- // it should stop readable
- assert.NoError(t, s.Set(kv.Item{
- Key: "key" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }, kv.Item{
- Key: "key2" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }))
- m.Unlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.RLock()
- v, err = s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- m.RUnlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- err = s.Delete("key" + strconv.Itoa(i))
- assert.NoError(t, err)
- m.Unlock()
- }
- }(s)
-
- wg.Wait()
-}
-
-func TestStorage_Has_Set_MGet(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestStorage_Has_Set_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world2",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
-
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.Get("key")
- assert.NoError(t, err)
-
- if string(res) != "hello world" {
- t.Fatal("wrong value by key")
- }
-}
-
-func TestStorage_Set_Del_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- // check that keys are present
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-
- assert.NoError(t, s.Delete("key", "key2"))
- // check that keys are not present
- res, err = s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 0)
-}
-
-func TestStorage_Set_GetM(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestNilAndWrongArgs(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- // check
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- _, err = s.Has("")
- assert.Error(t, err)
-
- _, err = s.Get("")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", "")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", " ")
- assert.Error(t, err)
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }))
-
- assert.Error(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "asdf",
- }))
-
- _, err = s.Has("key")
- assert.NoError(t, err)
-
- assert.Error(t, s.Set(kv.Item{}))
-
- err = s.Delete("")
- assert.Error(t, err)
-
- err = s.Delete("key", "")
- assert.Error(t, err)
-
- err = s.Delete("key", " ")
- assert.Error(t, err)
-
- err = s.Delete("key")
- assert.NoError(t, err)
-}
-
-func TestStorage_MExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
- // set timeout to 5 sec
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- i1 := kv.Item{
- Key: "key",
- Value: "",
- TTL: nowPlusFive,
- }
- i2 := kv.Item{
- Key: "key2",
- Value: "",
- TTL: nowPlusFive,
- }
- assert.NoError(t, s.MExpire(i1, i2))
-
- time.Sleep(time.Second * 7)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
-
-func TestStorage_SetExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- // set timeout to 5 sec
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: nowPlusFive,
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: nowPlusFive,
- }))
-
- time.Sleep(time.Second * 2)
- m, err := s.TTL("key", "key2")
- assert.NoError(t, err)
-
- // remove a precision 4.02342342 -> 4
- keyTTL, err := strconv.Atoi(m["key"].(string)[0:1])
- if err != nil {
- t.Fatal(err)
- }
-
- // remove a precision 4.02342342 -> 4
- key2TTL, err := strconv.Atoi(m["key"].(string)[0:1])
- if err != nil {
- t.Fatal(err)
- }
-
- assert.True(t, keyTTL < 5)
- assert.True(t, key2TTL < 5)
-
- time.Sleep(time.Second * 7)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}