summaryrefslogtreecommitdiff
path: root/plugins/kv
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
committerValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
commit3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch)
tree8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins/kv
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/kv')
-rw-r--r--plugins/kv/drivers/boltdb/config.go30
-rw-r--r--plugins/kv/drivers/boltdb/driver.go459
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go71
-rw-r--r--plugins/kv/plugin.go4
4 files changed, 2 insertions, 562 deletions
diff --git a/plugins/kv/drivers/boltdb/config.go b/plugins/kv/drivers/boltdb/config.go
deleted file mode 100644
index 0beb209b..00000000
--- a/plugins/kv/drivers/boltdb/config.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package boltdb
-
-type Config struct {
- // File is boltDB file. No need to create it by your own,
- // boltdb driver is able to create the file, or read existing
- File string
- // Bucket to store data in boltDB
- bucket string
- // db file permissions
- Permissions int
- // timeout
- Interval int `mapstructure:"interval"`
-}
-
-// InitDefaults initializes default values for the boltdb
-func (s *Config) InitDefaults() {
- s.bucket = "default"
-
- if s.File == "" {
- s.File = "rr.db" // default file name
- }
-
- if s.Permissions == 0 {
- s.Permissions = 0777 // free for all
- }
-
- if s.Interval == 0 {
- s.Interval = 60 // default is 60 seconds timeout
- }
-}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
deleted file mode 100644
index 15a5674f..00000000
--- a/plugins/kv/drivers/boltdb/driver.go
+++ /dev/null
@@ -1,459 +0,0 @@
-package boltdb
-
-import (
- "bytes"
- "encoding/gob"
- "os"
- "strings"
- "sync"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
- "github.com/spiral/roadrunner/v2/utils"
- bolt "go.etcd.io/bbolt"
-)
-
-type Driver struct {
- clearMu sync.RWMutex
- // db instance
- DB *bolt.DB
- // name should be UTF-8
- bucket []byte
- log logger.Logger
- cfg *Config
- // gc contains key which are contain timeouts
- gc sync.Map
- // default timeout for cache cleanup is 1 minute
- timeout time.Duration
-
- // stop is used to stop keys GC and close boltdb connection
- stop chan struct{}
-}
-
-func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
- const op = errors.Op("new_boltdb_driver")
-
- d := &Driver{
- log: log,
- stop: stop,
- }
-
- err := cfgPlugin.UnmarshalKey(key, &d.cfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // add default values
- d.cfg.InitDefaults()
-
- d.bucket = []byte(d.cfg.bucket)
- d.timeout = time.Duration(d.cfg.Interval) * time.Second
- d.gc = sync.Map{}
-
- db, err := bolt.Open(d.cfg.File, os.FileMode(d.cfg.Permissions), &bolt.Options{
- Timeout: time.Second * 20,
- NoGrowSync: false,
- NoFreelistSync: false,
- ReadOnly: false,
- NoSync: false,
- })
-
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- d.DB = db
-
- // create bucket if it does not exist
- // tx.Commit invokes via the db.Update
- err = db.Update(func(tx *bolt.Tx) error {
- const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists([]byte(d.cfg.bucket))
- if err != nil {
- return errors.E(op, upOp)
- }
- return nil
- })
-
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- go d.startGCLoop()
-
- return d, nil
-}
-
-func (d *Driver) Has(keys ...string) (map[string]bool, error) {
- const op = errors.Op("boltdb_driver_has")
- d.log.Debug("boltdb HAS method called", "args", keys)
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- m := make(map[string]bool, len(keys))
-
- // this is readable transaction
- err := d.DB.View(func(tx *bolt.Tx) error {
- // Get retrieves the value for a key in the bucket.
- // Returns a nil value if the key does not exist or if the key is a nested bucket.
- // The returned value is only valid for the life of the transaction.
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return errors.E(op, errors.EmptyKey)
- }
- b := tx.Bucket(d.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
- exist := b.Get([]byte(keys[i]))
- if exist != nil {
- m[keys[i]] = true
- }
- }
- return nil
- })
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- d.log.Debug("boltdb HAS method finished")
- return m, nil
-}
-
-// Get retrieves the value for a key in the bucket.
-// Returns a nil value if the key does not exist or if the key is a nested bucket.
-// The returned value is only valid for the life of the transaction.
-func (d *Driver) Get(key string) ([]byte, error) {
- const op = errors.Op("boltdb_driver_get")
- // to get cases like " "
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
-
- var val []byte
- err := d.DB.View(func(tx *bolt.Tx) error {
- b := tx.Bucket(d.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
- val = b.Get([]byte(key))
-
- // try to decode values
- if val != nil {
- buf := bytes.NewReader(val)
- decoder := gob.NewDecoder(buf)
-
- var i string
- err := decoder.Decode(&i)
- if err != nil {
- // unsafe (w/o runes) convert
- return errors.E(op, err)
- }
-
- // set the value
- val = []byte(i)
- }
- return nil
- })
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return val, nil
-}
-
-func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
- const op = errors.Op("boltdb_driver_mget")
- // defense
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- }
-
- m := make(map[string][]byte, len(keys))
-
- err := d.DB.View(func(tx *bolt.Tx) error {
- b := tx.Bucket(d.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
-
- buf := new(bytes.Buffer)
- var out []byte
- buf.Grow(100)
- for i := range keys {
- value := b.Get([]byte(keys[i]))
- buf.Write(value)
- // allocate enough space
- dec := gob.NewDecoder(buf)
- if value != nil {
- err := dec.Decode(&out)
- if err != nil {
- return errors.E(op, err)
- }
- m[keys[i]] = out
- buf.Reset()
- out = nil
- }
- }
-
- return nil
- })
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return m, nil
-}
-
-// Set puts the K/V to the bolt
-func (d *Driver) Set(items ...*kvv1.Item) error {
- const op = errors.Op("boltdb_driver_set")
- if items == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- // start writable transaction
- tx, err := d.DB.Begin(true)
- if err != nil {
- return errors.E(op, err)
- }
- defer func() {
- err = tx.Commit()
- if err != nil {
- errRb := tx.Rollback()
- if errRb != nil {
- d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
- }
- }
- }()
-
- b := tx.Bucket(d.bucket)
- // use access by index to avoid copying
- for i := range items {
- // performance note: pass a prepared bytes slice with initial cap
- // we can't move buf and gob out of loop, because we need to clear both from data
- // but gob will contain (w/o re-init) the past data
- buf := new(bytes.Buffer)
- encoder := gob.NewEncoder(buf)
- if errors.Is(errors.EmptyItem, err) {
- return errors.E(op, errors.EmptyItem)
- }
-
- // Encode value
- err = encoder.Encode(&items[i].Value)
- if err != nil {
- return errors.E(op, err)
- }
- // buf.Bytes will copy the underlying slice. Take a look in case of performance problems
- err = b.Put([]byte(items[i].Key), buf.Bytes())
- if err != nil {
- return errors.E(op, err)
- }
-
- // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
- // we do not need mutex here, since we use sync.Map
- if items[i].Timeout != "" {
- // check correctness of provided TTL
- _, err := time.Parse(time.RFC3339, items[i].Timeout)
- if err != nil {
- return errors.E(op, err)
- }
- // Store key TTL in the separate map
- d.gc.Store(items[i].Key, items[i].Timeout)
- }
-
- buf.Reset()
- }
-
- return nil
-}
-
-// Delete all keys from DB
-func (d *Driver) Delete(keys ...string) error {
- const op = errors.Op("boltdb_driver_delete")
- if keys == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return errors.E(op, errors.EmptyKey)
- }
- }
-
- // start writable transaction
- tx, err := d.DB.Begin(true)
- if err != nil {
- return errors.E(op, err)
- }
-
- defer func() {
- err = tx.Commit()
- if err != nil {
- errRb := tx.Rollback()
- if errRb != nil {
- d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
- }
- }
- }()
-
- b := tx.Bucket(d.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
-
- for _, key := range keys {
- err = b.Delete([]byte(key))
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- return nil
-}
-
-// MExpire sets the expiration time to the key
-// If key already has the expiration time, it will be overwritten
-func (d *Driver) MExpire(items ...*kvv1.Item) error {
- const op = errors.Op("boltdb_driver_mexpire")
- for i := range items {
- if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
- return errors.E(op, errors.Str("should set timeout and at least one key"))
- }
-
- // verify provided TTL
- _, err := time.Parse(time.RFC3339, items[i].Timeout)
- if err != nil {
- return errors.E(op, err)
- }
-
- d.gc.Store(items[i].Key, items[i].Timeout)
- }
- return nil
-}
-
-func (d *Driver) TTL(keys ...string) (map[string]string, error) {
- const op = errors.Op("boltdb_driver_ttl")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- }
-
- m := make(map[string]string, len(keys))
-
- for i := range keys {
- if item, ok := d.gc.Load(keys[i]); ok {
- // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64
- m[keys[i]] = item.(string)
- }
- }
- return m, nil
-}
-
-func (d *Driver) Clear() error {
- err := d.DB.Update(func(tx *bolt.Tx) error {
- err := tx.DeleteBucket(d.bucket)
- if err != nil {
- d.log.Error("boltdb delete bucket", "error", err)
- return err
- }
-
- _, err = tx.CreateBucket(d.bucket)
- if err != nil {
- d.log.Error("boltdb create bucket", "error", err)
- return err
- }
-
- return nil
- })
-
- if err != nil {
- d.log.Error("clear transaction failed", "error", err)
- return err
- }
-
- d.clearMu.Lock()
- d.gc = sync.Map{}
- d.clearMu.Unlock()
-
- return nil
-}
-
-// ========================= PRIVATE =================================
-
-func (d *Driver) startGCLoop() { //nolint:gocognit
- go func() {
- t := time.NewTicker(d.timeout)
- defer t.Stop()
- for {
- select {
- case <-t.C:
- d.clearMu.RLock()
-
- // calculate current time before loop started to be fair
- now := time.Now()
- d.gc.Range(func(key, value interface{}) bool {
- const op = errors.Op("boltdb_plugin_gc")
- k := key.(string)
- v, err := time.Parse(time.RFC3339, value.(string))
- if err != nil {
- return false
- }
-
- if now.After(v) {
- // time expired
- d.gc.Delete(k)
- d.log.Debug("key deleted", "key", k)
- err := d.DB.Update(func(tx *bolt.Tx) error {
- b := tx.Bucket(d.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
- err := b.Delete(utils.AsBytes(k))
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- })
- if err != nil {
- d.log.Error("error during the gc phase of update", "error", err)
- return false
- }
- }
- return true
- })
-
- d.clearMu.RUnlock()
- case <-d.stop:
- err := d.DB.Close()
- if err != nil {
- d.log.Error("error")
- }
- return
- }
- }
- }()
-}
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
deleted file mode 100644
index c839130f..00000000
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package boltdb
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/kv"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "boltdb"
- RootPluginName string = "kv"
-)
-
-// Plugin BoltDB K/V storage.
-type Plugin struct {
- cfgPlugin config.Configurer
- // logger
- log logger.Logger
- // stop is used to stop keys GC and close boltdb connection
- stop chan struct{}
-
- drivers uint
-}
-
-func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(RootPluginName) {
- return errors.E(errors.Disabled)
- }
-
- s.stop = make(chan struct{})
- s.log = log
- s.cfgPlugin = cfg
- return nil
-}
-
-// Serve is noop here
-func (s *Plugin) Serve() chan error {
- return make(chan error, 1)
-}
-
-func (s *Plugin) Stop() error {
- if s.drivers > 0 {
- for i := uint(0); i < s.drivers; i++ {
- // send close signal to every driver
- s.stop <- struct{}{}
- }
- }
- return nil
-}
-
-func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
- const op = errors.Op("boltdb_plugin_provide")
- st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save driver number to release resources after Stop
- s.drivers++
-
- return st, nil
-}
-
-// Name returns plugin name
-func (s *Plugin) Name() string {
- return PluginName
-}
-
-// Available interface implementation
-func (s *Plugin) Available() {}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 53fade97..9a19f96c 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -109,6 +109,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, k)
// at this point we know, that driver field present in the configuration
+ // TODO(rustatian): refactor, made generic, with checks like in the broadcast, websockets or jobs
switch v.(map[string]interface{})[driver] {
case memcached:
if _, ok := p.constructors[memcached]; !ok {
@@ -220,5 +221,4 @@ func (p *Plugin) Name() string {
}
// Available interface implementation
-func (p *Plugin) Available() {
-}
+func (p *Plugin) Available() {}