summaryrefslogtreecommitdiff
path: root/plugins/kv/drivers
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv/drivers')
-rw-r--r--plugins/kv/drivers/boltdb/driver.go17
-rw-r--r--plugins/kv/drivers/memcached/driver.go18
-rw-r--r--plugins/kv/drivers/memcached/plugin.go2
-rw-r--r--plugins/kv/drivers/memory/driver.go43
-rw-r--r--plugins/kv/drivers/redis/driver.go20
5 files changed, 56 insertions, 44 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 0f647cb1..ba873513 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -10,6 +10,7 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -213,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
// Set puts the K/V to the bolt
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -259,14 +260,14 @@ func (d *Driver) Set(items ...kv.Item) error {
// if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
// we do not need mutex here, since we use sync.Map
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// check correctness of provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
// Store key TTL in the separate map
- d.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].Timeout)
}
buf.Reset()
@@ -323,20 +324,20 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// verify provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
- d.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].Timeout)
}
return nil
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 02281ed5..8ea515d0 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,6 +6,7 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -134,14 +135,14 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
for i := range items {
- if items[i] == EmptyItem {
+ if items[i] == nil {
return errors.E(op, errors.EmptyItem)
}
@@ -154,9 +155,9 @@ func (d *Driver) Set(items ...kv.Item) error {
}
// add additional TTL in case of TTL isn't empty
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// verify the TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
@@ -175,15 +176,18 @@ func (d *Driver) Set(items ...kv.Item) error {
// MExpire Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// verify provided TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index cde84f42..3997e0d4 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -9,8 +9,6 @@ import (
const PluginName = "memcached"
-var EmptyItem = kv.Item{}
-
type Plugin struct {
// config plugin
cfgPlugin config.Configurer
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index c2494ee7..3158adee 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -71,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return utils.AsBytes(data.(kv.Item).Value), nil
+ return utils.AsBytes(data.(*kvv1.Item).Value), nil
}
return nil, nil
}
@@ -94,24 +95,27 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if value, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = value.(kv.Item).Value
+ m[keys[i]] = value.(*kvv1.Item).Value
}
}
return m, nil
}
-func (s *Driver) Set(items ...kv.Item) error {
+func (s *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
for i := range items {
+ if items[i] == nil {
+ continue
+ }
// TTL is set
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// check the TTL in the item
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
@@ -124,28 +128,31 @@ func (s *Driver) Set(items ...kv.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...kv.Item) error {
+func (s *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// if key exist, overwrite it value
- if pItem, ok := s.heap.Load(items[i].Key); ok {
+ if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok {
// check that time is correct
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
- tmp := pItem.(kv.Item)
+ tmp := pItem.(*kvv1.Item)
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, kv.Item{
- Key: items[i].Key,
- Value: tmp.Value,
- TTL: items[i].TTL,
+ s.heap.Store(items[i].Key, &kvv1.Item{
+ Key: items[i].Key,
+ Value: tmp.Value,
+ Timeout: items[i].Timeout,
})
}
}
@@ -171,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if item, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = item.(kv.Item).TTL
+ m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
@@ -209,12 +216,12 @@ func (s *Driver) gc() {
case now := <-ticker.C:
// check every second
s.heap.Range(func(key, value interface{}) bool {
- v := value.(kv.Item)
- if v.TTL == "" {
+ v := value.(*kvv1.Item)
+ if v.Timeout == "" {
return true
}
- t, err := time.Parse(time.RFC3339, v.TTL)
+ t, err := time.Parse(time.RFC3339, v.Timeout)
if err != nil {
return false
}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
index d0b541b2..0aaa6352 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/kv/drivers/redis/driver.go
@@ -7,13 +7,12 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-var EmptyItem = kv.Item{}
-
type Driver struct {
universalClient redis.UniversalClient
log logger.Logger
@@ -139,24 +138,24 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
//
// Use expiration for `SETEX`-like behavior.
// Zero expiration means the key has no expiration time.
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
now := time.Now()
for _, item := range items {
- if item == EmptyItem {
+ if item == nil {
return errors.E(op, errors.EmptyKey)
}
- if item.TTL == "" {
+ if item.Timeout == "" {
err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err()
if err != nil {
return err
}
} else {
- t, err := time.Parse(time.RFC3339, item.TTL)
+ t, err := time.Parse(time.RFC3339, item.Timeout)
if err != nil {
return err
}
@@ -188,15 +187,18 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire https://redis.io/commands/expire
// timeout in RFC3339
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_mexpire")
now := time.Now()
for _, item := range items {
- if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ if item == nil {
+ continue
+ }
+ if item.Timeout == "" || strings.TrimSpace(item.Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
- t, err := time.Parse(time.RFC3339, item.TTL)
+ t, err := time.Parse(time.RFC3339, item.Timeout)
if err != nil {
return err
}