summaryrefslogtreecommitdiff
path: root/plugins/kv/drivers
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv/drivers')
-rw-r--r--plugins/kv/drivers/boltdb/config.go37
-rw-r--r--plugins/kv/drivers/boltdb/driver.go420
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go65
-rw-r--r--plugins/kv/drivers/memcached/config.go12
-rw-r--r--plugins/kv/drivers/memcached/driver.go235
-rw-r--r--plugins/kv/drivers/memcached/plugin.go44
-rw-r--r--plugins/kv/drivers/memory/config.go14
-rw-r--r--plugins/kv/drivers/memory/driver.go229
-rw-r--r--plugins/kv/drivers/memory/plugin.go64
-rw-r--r--plugins/kv/drivers/redis/config.go34
-rw-r--r--plugins/kv/drivers/redis/driver.go239
-rw-r--r--plugins/kv/drivers/redis/plugin.go51
12 files changed, 1444 insertions, 0 deletions
diff --git a/plugins/kv/drivers/boltdb/config.go b/plugins/kv/drivers/boltdb/config.go
new file mode 100644
index 00000000..ebe73c25
--- /dev/null
+++ b/plugins/kv/drivers/boltdb/config.go
@@ -0,0 +1,37 @@
+package boltdb
+
+type Config struct {
+ // Dir is a directory to store the DB files
+ Dir string
+ // File is boltDB file. No need to create it by your own,
+ // boltdb driver is able to create the file, or read existing
+ File string
+ // Bucket to store data in boltDB
+ Bucket string
+ // db file permissions
+ Permissions int
+ // timeout
+ Interval uint `mapstructure:"interval"`
+}
+
+// InitDefaults initializes default values for the boltdb
+func (s *Config) InitDefaults() {
+ if s.Dir == "" {
+ s.Dir = "." // current dir
+ }
+ if s.Bucket == "" {
+ s.Bucket = "rr" // default bucket name
+ }
+
+ if s.File == "" {
+ s.File = "rr.db" // default file name
+ }
+
+ if s.Permissions == 0 {
+ s.Permissions = 777 // free for all
+ }
+
+ if s.Interval == 0 {
+ s.Interval = 60 // default is 60 seconds timeout
+ }
+}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
new file mode 100644
index 00000000..b596d4c3
--- /dev/null
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -0,0 +1,420 @@
+package boltdb
+
+import (
+ "bytes"
+ "encoding/gob"
+ "os"
+ "path"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ bolt "go.etcd.io/bbolt"
+)
+
+type Driver struct {
+ // db instance
+ DB *bolt.DB
+ // name should be UTF-8
+ bucket []byte
+ log logger.Logger
+ cfg *Config
+ // gc contains key which are contain timeouts
+ gc sync.Map
+ // default timeout for cache cleanup is 1 minute
+ timeout time.Duration
+
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+}
+
+func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) {
+ const op = errors.Op("new_boltdb_driver")
+
+ d := &Driver{
+ log: log,
+ stop: stop,
+ }
+
+ err := cfgPlugin.UnmarshalKey(key, &d.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.bucket = []byte(d.cfg.Bucket)
+ d.timeout = time.Duration(d.cfg.Interval) * time.Second
+ d.gc = sync.Map{}
+
+ // add default values
+ d.cfg.InitDefaults()
+
+ db, err := bolt.Open(path.Join(d.cfg.Dir, d.cfg.File), os.FileMode(d.cfg.Permissions), nil)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.DB = db
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists([]byte(d.cfg.Bucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ return nil
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ go d.startGCLoop()
+
+ return d, nil
+}
+
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("boltdb_driver_has")
+ d.log.Debug("boltdb HAS method called", "args", keys)
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ m := make(map[string]bool, len(keys))
+
+ // this is readable transaction
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ // Get retrieves the value for a key in the bucket.
+ // Returns a nil value if the key does not exist or if the key is a nested bucket.
+ // The returned value is only valid for the life of the transaction.
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ exist := b.Get([]byte(keys[i]))
+ if exist != nil {
+ m[keys[i]] = true
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.log.Debug("boltdb HAS method finished")
+ return m, nil
+}
+
+// Get retrieves the value for a key in the bucket.
+// Returns a nil value if the key does not exist or if the key is a nested bucket.
+// The returned value is only valid for the life of the transaction.
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("boltdb_driver_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ var val []byte
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ val = b.Get([]byte(key))
+
+ // try to decode values
+ if val != nil {
+ buf := bytes.NewReader(val)
+ decoder := gob.NewDecoder(buf)
+
+ var i string
+ err := decoder.Decode(&i)
+ if err != nil {
+ // unsafe (w/o runes) convert
+ return errors.E(op, err)
+ }
+
+ // set the value
+ val = []byte(i)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return val, nil
+}
+
+func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("boltdb_driver_mget")
+ // defense
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+
+ buf := new(bytes.Buffer)
+ var out string
+ buf.Grow(100)
+ for i := range keys {
+ value := b.Get([]byte(keys[i]))
+ buf.Write(value)
+ // allocate enough space
+ dec := gob.NewDecoder(buf)
+ if value != nil {
+ err := dec.Decode(&out)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ m[keys[i]] = out
+ buf.Reset()
+ out = ""
+ }
+ }
+
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return m, nil
+}
+
+// Set puts the K/V to the bolt
+func (d *Driver) Set(items ...kv.Item) error {
+ const op = errors.Op("boltdb_driver_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // start writable transaction
+ tx, err := d.DB.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ defer func() {
+ err = tx.Commit()
+ if err != nil {
+ errRb := tx.Rollback()
+ if errRb != nil {
+ d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ }
+ }
+ }()
+
+ b := tx.Bucket(d.bucket)
+ // use access by index to avoid copying
+ for i := range items {
+ // performance note: pass a prepared bytes slice with initial cap
+ // we can't move buf and gob out of loop, because we need to clear both from data
+ // but gob will contain (w/o re-init) the past data
+ buf := bytes.Buffer{}
+ encoder := gob.NewEncoder(&buf)
+ if errors.Is(errors.EmptyItem, err) {
+ return errors.E(op, errors.EmptyItem)
+ }
+
+ // Encode value
+ err = encoder.Encode(&items[i].Value)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ // buf.Bytes will copy the underlying slice. Take a look in case of performance problems
+ err = b.Put([]byte(items[i].Key), buf.Bytes())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
+ // we do not need mutex here, since we use sync.Map
+ if items[i].TTL != "" {
+ // check correctness of provided TTL
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ // Store key TTL in the separate map
+ d.gc.Store(items[i].Key, items[i].TTL)
+ }
+
+ buf.Reset()
+ }
+
+ return nil
+}
+
+// Delete all keys from DB
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("boltdb_driver_delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ // start writable transaction
+ tx, err := d.DB.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ defer func() {
+ err = tx.Commit()
+ if err != nil {
+ errRb := tx.Rollback()
+ if errRb != nil {
+ d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ }
+ }
+ }()
+
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+
+ for _, key := range keys {
+ err = b.Delete([]byte(key))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
+
+// MExpire sets the expiration time to the key
+// If key already has the expiration time, it will be overwritten
+func (d *Driver) MExpire(items ...kv.Item) error {
+ const op = errors.Op("boltdb_driver_mexpire")
+ for i := range items {
+ if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // verify provided TTL
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ d.gc.Store(items[i].Key, items[i].TTL)
+ }
+ return nil
+}
+
+func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("boltdb_driver_ttl")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for i := range keys {
+ if item, ok := d.gc.Load(keys[i]); ok {
+ // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64
+ m[keys[i]] = item.(string)
+ }
+ }
+ return m, nil
+}
+
+// ========================= PRIVATE =================================
+
+func (d *Driver) startGCLoop() { //nolint:gocognit
+ go func() {
+ t := time.NewTicker(d.timeout)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ // calculate current time before loop started to be fair
+ now := time.Now()
+ d.gc.Range(func(key, value interface{}) bool {
+ const op = errors.Op("boltdb_plugin_gc")
+ k := key.(string)
+ v, err := time.Parse(time.RFC3339, value.(string))
+ if err != nil {
+ return false
+ }
+
+ if now.After(v) {
+ // time expired
+ d.gc.Delete(k)
+ d.log.Debug("key deleted", "key", k)
+ err := d.DB.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ err := b.Delete([]byte(k))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ })
+ if err != nil {
+ d.log.Error("error during the gc phase of update", "error", err)
+ // todo this error is ignored, it means, that timer still be active
+ // to prevent this, we need to invoke t.Stop()
+ return false
+ }
+ }
+ return true
+ })
+ case <-d.stop:
+ err := d.DB.Close()
+ if err != nil {
+ d.log.Error("error")
+ }
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
new file mode 100644
index 00000000..9d1e0dba
--- /dev/null
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -0,0 +1,65 @@
+package boltdb
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "boltdb"
+
+// Plugin BoltDB K/V storage.
+type Plugin struct {
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+
+ drivers uint
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ if !cfg.Has(kv.PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ s.stop = make(chan struct{})
+ s.log = log
+ s.cfgPlugin = cfg
+ return nil
+}
+
+// Serve is noop here
+func (s *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (s *Plugin) Stop() error {
+ if s.drivers > 0 {
+ for i := uint(0); i < s.drivers; i++ {
+ // send close signal to every driver
+ s.stop <- struct{}{}
+ }
+ }
+ return nil
+}
+
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_provide")
+ st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save driver number to release resources after Stop
+ s.drivers++
+
+ return st, nil
+}
+
+// Name returns plugin name
+func (s *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/kv/drivers/memcached/config.go
new file mode 100644
index 00000000..7aad53b6
--- /dev/null
+++ b/plugins/kv/drivers/memcached/config.go
@@ -0,0 +1,12 @@
+package memcached
+
+type Config struct {
+ // Addr is url for memcached, 11211 port is used by default
+ Addr []string
+}
+
+func (s *Config) InitDefaults() {
+ if s.Addr == nil {
+ s.Addr = []string{"localhost:11211"} // default url for memcached
+ }
+}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
new file mode 100644
index 00000000..17b06fa0
--- /dev/null
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -0,0 +1,235 @@
+package memcached
+
+import (
+ "strings"
+ "time"
+
+ "github.com/bradfitz/gomemcache/memcache"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type Driver struct {
+ client *memcache.Client
+ log logger.Logger
+ cfg *Config
+}
+
+// NewMemcachedDriver returns a memcache client using the provided server(s)
+// with equal weight. If a server is listed multiple times,
+// it gets a proportional amount of weight.
+func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) {
+ const op = errors.Op("new_memcached_driver")
+
+ s := &Driver{
+ log: log,
+ }
+
+ err := cfgPlugin.UnmarshalKey(key, &s.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ s.cfg.InitDefaults()
+
+ m := memcache.New(s.cfg.Addr...)
+ s.client = m
+
+ return s, nil
+}
+
+// Has checks the key for existence
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("memcached_plugin_has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+ m := make(map[string]bool, len(keys))
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ exist, err := d.client.Get(keys[i])
+
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return nil, errors.E(op, err)
+ }
+ if exist != nil {
+ m[keys[i]] = true
+ }
+ }
+ return m, nil
+}
+
+// Get gets the item for the given key. ErrCacheMiss is returned for a
+// memcache cache miss. The key must be at most 250 bytes in length.
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("memcached_plugin_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ data, err := d.client.Get(key)
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ return nil, nil
+ }
+ return nil, errors.E(op, err)
+ }
+ if data != nil {
+ // return the value by the key
+ return data.Value, nil
+ }
+ // data is nil by some reason and error also nil
+ return nil, nil
+}
+
+// MGet return map with key -- string
+// and map value as value -- []byte
+func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("memcached_plugin_mget")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+ for i := range keys {
+ // Here also MultiGet
+ data, err := d.client.Get(keys[i])
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return nil, errors.E(op, err)
+ }
+ if data != nil {
+ m[keys[i]] = data.Value
+ }
+ }
+
+ return m, nil
+}
+
+// Set sets the KV pairs. Keys should be 250 bytes maximum
+// TTL:
+// Expiration is the cache expiration time, in seconds: either a relative
+// time from now (up to 1 month), or an absolute Unix epoch time.
+// Zero means the Item has no expiration time.
+func (d *Driver) Set(items ...kv.Item) error {
+ const op = errors.Op("memcached_plugin_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ for i := range items {
+ if items[i] == EmptyItem {
+ return errors.E(op, errors.EmptyItem)
+ }
+
+ // pre-allocate item
+ memcachedItem := &memcache.Item{
+ Key: items[i].Key,
+ // unsafe convert
+ Value: []byte(items[i].Value),
+ Flags: 0,
+ }
+
+ // add additional TTL in case of TTL isn't empty
+ if items[i].TTL != "" {
+ // verify the TTL
+ t, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return err
+ }
+ memcachedItem.Expiration = int32(t.Unix())
+ }
+
+ err := d.client.Set(memcachedItem)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// MExpire Expiration is the cache expiration time, in seconds: either a relative
+// time from now (up to 1 month), or an absolute Unix epoch time.
+// Zero means the Item has no expiration time.
+func (d *Driver) MExpire(items ...kv.Item) error {
+ const op = errors.Op("memcached_plugin_mexpire")
+ for i := range items {
+ if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // verify provided TTL
+ t, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // Touch updates the expiry for the given key. The seconds parameter is either
+ // a Unix timestamp or, if seconds is less than 1 month, the number of seconds
+ // into the future at which time the item will expire. Zero means the item has
+ // no expiration time. ErrCacheMiss is returned if the key is not in the cache.
+ // The key must be at most 250 bytes in length.
+ err = d.client.Touch(items[i].Key, int32(t.Unix()))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ return nil
+}
+
+// TTL return time in seconds (int32) for a given keys
+func (d *Driver) TTL(_ ...string) (map[string]interface{}, error) {
+ const op = errors.Op("memcached_plugin_ttl")
+ return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
+}
+
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("memcached_plugin_has")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ for i := range keys {
+ err := d.client.Delete(keys[i])
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return errors.E(op, err)
+ }
+ }
+ return nil
+}
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
new file mode 100644
index 00000000..af59e91b
--- /dev/null
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -0,0 +1,44 @@
+package memcached
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "memcached"
+
+var EmptyItem = kv.Item{}
+
+type Plugin struct {
+ // config plugin
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ if !cfg.Has(kv.PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ s.cfgPlugin = cfg
+ s.log = log
+ return nil
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_provide")
+ st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return st, nil
+}
diff --git a/plugins/kv/drivers/memory/config.go b/plugins/kv/drivers/memory/config.go
new file mode 100644
index 00000000..e51d09c5
--- /dev/null
+++ b/plugins/kv/drivers/memory/config.go
@@ -0,0 +1,14 @@
+package memory
+
+// Config is default config for the in-memory driver
+type Config struct {
+ // Interval for the check
+ Interval int
+}
+
+// InitDefaults by default driver is turned off
+func (c *Config) InitDefaults() {
+ if c.Interval == 0 {
+ c.Interval = 60 // seconds
+ }
+}
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
new file mode 100644
index 00000000..1e0d03d4
--- /dev/null
+++ b/plugins/kv/drivers/memory/driver.go
@@ -0,0 +1,229 @@
+package memory
+
+import (
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type Driver struct {
+ heap sync.Map
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+ log logger.Logger
+ cfg *Config
+}
+
+func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) {
+ const op = errors.Op("new_in_memory_driver")
+
+ d := &Driver{
+ stop: stop,
+ log: log,
+ }
+
+ err := cfgPlugin.UnmarshalKey(key, &d.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.cfg.InitDefaults()
+
+ go d.gc()
+
+ return d, nil
+}
+
+func (s *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("in_memory_plugin_has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+ m := make(map[string]bool)
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ if _, ok := s.heap.Load(keys[i]); ok {
+ m[keys[i]] = true
+ }
+ }
+
+ return m, nil
+}
+
+func (s *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("in_memory_plugin_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ if data, exist := s.heap.Load(key); exist {
+ // here might be a panic
+ // but data only could be a string, see Set function
+ return []byte(data.(kv.Item).Value), nil
+ }
+ return nil, nil
+}
+
+func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("in_memory_plugin_mget")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for i := range keys {
+ if value, ok := s.heap.Load(keys[i]); ok {
+ m[keys[i]] = value.(kv.Item).Value
+ }
+ }
+
+ return m, nil
+}
+
+func (s *Driver) Set(items ...kv.Item) error {
+ const op = errors.Op("in_memory_plugin_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ for i := range items {
+ // TTL is set
+ if items[i].TTL != "" {
+ // check the TTL in the item
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return err
+ }
+ }
+
+ s.heap.Store(items[i].Key, items[i])
+ }
+ return nil
+}
+
+// MExpire sets the expiration time to the key
+// If key already has the expiration time, it will be overwritten
+func (s *Driver) MExpire(items ...kv.Item) error {
+ const op = errors.Op("in_memory_plugin_mexpire")
+ for i := range items {
+ if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // if key exist, overwrite it value
+ if pItem, ok := s.heap.Load(items[i].Key); ok {
+ // check that time is correct
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ tmp := pItem.(kv.Item)
+ // guess that t is in the future
+ // in memory is just FOR TESTING PURPOSES
+ // LOGIC ISN'T IDEAL
+ s.heap.Store(items[i].Key, kv.Item{
+ Key: items[i].Key,
+ Value: tmp.Value,
+ TTL: items[i].TTL,
+ })
+ }
+ }
+
+ return nil
+}
+
+func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("in_memory_plugin_ttl")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for i := range keys {
+ if item, ok := s.heap.Load(keys[i]); ok {
+ m[keys[i]] = item.(kv.Item).TTL
+ }
+ }
+ return m, nil
+}
+
+func (s *Driver) Delete(keys ...string) error {
+ const op = errors.Op("in_memory_plugin_delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ for i := range keys {
+ s.heap.Delete(keys[i])
+ }
+ return nil
+}
+
+// ================================== PRIVATE ======================================
+
+func (s *Driver) gc() {
+ ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second)
+ for {
+ select {
+ case <-s.stop:
+ ticker.Stop()
+ return
+ case now := <-ticker.C:
+ // check every second
+ s.heap.Range(func(key, value interface{}) bool {
+ v := value.(kv.Item)
+ if v.TTL == "" {
+ return true
+ }
+
+ t, err := time.Parse(time.RFC3339, v.TTL)
+ if err != nil {
+ return false
+ }
+
+ if now.After(t) {
+ s.log.Debug("key deleted", "key", key)
+ s.heap.Delete(key)
+ }
+ return true
+ })
+ }
+ }
+}
diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go
new file mode 100644
index 00000000..acc6023d
--- /dev/null
+++ b/plugins/kv/drivers/memory/plugin.go
@@ -0,0 +1,64 @@
+package memory
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// PluginName is user friendly name for the plugin
+const PluginName = "memory"
+
+type Plugin struct {
+ // heap is user map for the key-value pairs
+ stop chan struct{}
+
+ log logger.Logger
+ cfgPlugin config.Configurer
+ drivers uint
+}
+
+func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("in_memory_plugin_init")
+ if !cfg.Has(kv.PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ s.log = log
+ s.cfgPlugin = cfg
+ s.stop = make(chan struct{}, 1)
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (s *Plugin) Stop() error {
+ if s.drivers > 0 {
+ for i := uint(0); i < s.drivers; i++ {
+ // send close signal to every driver
+ s.stop <- struct{}{}
+ }
+ }
+ return nil
+}
+
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("inmemory_plugin_provide")
+ st, err := NewInMemoryDriver(s.log, key, s.cfgPlugin, s.stop)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save driver number to release resources after Stop
+ s.drivers++
+
+ return st, nil
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/kv/drivers/redis/config.go b/plugins/kv/drivers/redis/config.go
new file mode 100644
index 00000000..41348236
--- /dev/null
+++ b/plugins/kv/drivers/redis/config.go
@@ -0,0 +1,34 @@
+package redis
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
new file mode 100644
index 00000000..d0b541b2
--- /dev/null
+++ b/plugins/kv/drivers/redis/driver.go
@@ -0,0 +1,239 @@
+package redis
+
+import (
+ "context"
+ "strings"
+ "time"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+var EmptyItem = kv.Item{}
+
+type Driver struct {
+ universalClient redis.UniversalClient
+ log logger.Logger
+ cfg *Config
+}
+
+func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) {
+ const op = errors.Op("new_boltdb_driver")
+
+ d := &Driver{
+ log: log,
+ }
+
+ // will be different for every connected driver
+ err := cfgPlugin.UnmarshalKey(key, &d.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.cfg.InitDefaults()
+ d.log = log
+
+ d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: d.cfg.Addrs,
+ DB: d.cfg.DB,
+ Username: d.cfg.Username,
+ Password: d.cfg.Password,
+ SentinelPassword: d.cfg.SentinelPassword,
+ MaxRetries: d.cfg.MaxRetries,
+ MinRetryBackoff: d.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: d.cfg.MaxRetryBackoff,
+ DialTimeout: d.cfg.DialTimeout,
+ ReadTimeout: d.cfg.ReadTimeout,
+ WriteTimeout: d.cfg.WriteTimeout,
+ PoolSize: d.cfg.PoolSize,
+ MinIdleConns: d.cfg.MinIdleConns,
+ MaxConnAge: d.cfg.MaxConnAge,
+ PoolTimeout: d.cfg.PoolTimeout,
+ IdleTimeout: d.cfg.IdleTimeout,
+ IdleCheckFrequency: d.cfg.IdleCheckFreq,
+ ReadOnly: d.cfg.ReadOnly,
+ RouteByLatency: d.cfg.RouteByLatency,
+ RouteRandomly: d.cfg.RouteRandomly,
+ MasterName: d.cfg.MasterName,
+ })
+
+ return d, nil
+}
+
+// Has checks if value exists.
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("redis_driver_has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ m := make(map[string]bool, len(keys))
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ exist, err := d.universalClient.Exists(context.Background(), key).Result()
+ if err != nil {
+ return nil, err
+ }
+ if exist == 1 {
+ m[key] = true
+ }
+ }
+ return m, nil
+}
+
+// Get loads key content into slice.
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("redis_driver_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ return d.universalClient.Get(context.Background(), key).Bytes()
+}
+
+// MGet loads content of multiple values (some values might be skipped).
+// https://redis.io/commands/mget
+// Returns slice with the interfaces with values
+func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("redis_driver_mget")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, k := range keys {
+ cmd := d.universalClient.Get(context.Background(), k)
+ if cmd.Err() != nil {
+ if cmd.Err() == redis.Nil {
+ continue
+ }
+ return nil, errors.E(op, cmd.Err())
+ }
+
+ m[k] = cmd.Val()
+ }
+
+ return m, nil
+}
+
+// Set sets value with the TTL in seconds
+// https://redis.io/commands/set
+// Redis `SET key value [expiration]` command.
+//
+// Use expiration for `SETEX`-like behavior.
+// Zero expiration means the key has no expiration time.
+func (d *Driver) Set(items ...kv.Item) error {
+ const op = errors.Op("redis_driver_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+ now := time.Now()
+ for _, item := range items {
+ if item == EmptyItem {
+ return errors.E(op, errors.EmptyKey)
+ }
+
+ if item.TTL == "" {
+ err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err()
+ if err != nil {
+ return err
+ }
+ } else {
+ t, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+ err = d.universalClient.Set(context.Background(), item.Key, item.Value, t.Sub(now)).Err()
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// Delete one or multiple keys.
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("redis_driver_delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+ return d.universalClient.Del(context.Background(), keys...).Err()
+}
+
+// MExpire https://redis.io/commands/expire
+// timeout in RFC3339
+func (d *Driver) MExpire(items ...kv.Item) error {
+ const op = errors.Op("redis_driver_mexpire")
+ now := time.Now()
+ for _, item := range items {
+ if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ t, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+
+ // t guessed to be in future
+ // for Redis we use t.Sub, it will result in seconds, like 4.2s
+ d.universalClient.Expire(context.Background(), item.Key, t.Sub(now))
+ }
+
+ return nil
+}
+
+// TTL https://redis.io/commands/ttl
+// return time in seconds (float64) for a given keys
+func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("redis_driver_ttl")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, key := range keys {
+ duration, err := d.universalClient.TTL(context.Background(), key).Result()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = duration.Seconds()
+ }
+ return m, nil
+}
diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go
new file mode 100644
index 00000000..d2183411
--- /dev/null
+++ b/plugins/kv/drivers/redis/plugin.go
@@ -0,0 +1,51 @@
+package redis
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "redis"
+
+// Plugin BoltDB K/V storage.
+type Plugin struct {
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ if !cfg.Has(kv.PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ s.log = log
+ s.cfgPlugin = cfg
+ return nil
+}
+
+// Serve is noop here
+func (s *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (s *Plugin) Stop() error {
+ return nil
+}
+
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("redis_plugin_provide")
+ st, err := NewRedisDriver(s.log, key, s.cfgPlugin)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return st, nil
+}
+
+// Name returns plugin name
+func (s *Plugin) Name() string {
+ return PluginName
+}