summaryrefslogtreecommitdiff
path: root/plugins/kv
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-08 22:04:28 +0300
committerValery Piashchynski <[email protected]>2021-06-08 22:04:28 +0300
commitcc271dceb13d3929f0382311dfce3dfed2ce04ce (patch)
tree13c4c3f380d8309b95c9600cc2000d1d5ab87cda /plugins/kv
parenta8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (diff)
- Add protobuf versioning
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/kv')
-rw-r--r--plugins/kv/drivers/boltdb/driver.go6
-rw-r--r--plugins/kv/drivers/memcached/driver.go6
-rw-r--r--plugins/kv/drivers/memory/driver.go18
-rw-r--r--plugins/kv/drivers/redis/driver.go6
-rw-r--r--plugins/kv/interface.go6
-rw-r--r--plugins/kv/payload.proto17
-rw-r--r--plugins/kv/payload/payload.pb.go234
-rw-r--r--plugins/kv/rpc.go14
8 files changed, 28 insertions, 279 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index af107dff..ba873513 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -10,9 +10,9 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
@@ -214,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
// Set puts the K/V to the bolt
-func (d *Driver) Set(items ...*payload.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -324,7 +324,7 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (d *Driver) MExpire(items ...*payload.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 9c4689c4..8ea515d0 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,9 +6,9 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -135,7 +135,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) Set(items ...*payload.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -176,7 +176,7 @@ func (d *Driver) Set(items ...*payload.Item) error {
// MExpire Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) MExpire(items ...*payload.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_mexpire")
for i := range items {
if items[i] == nil {
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index fae2c831..3158adee 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -6,9 +6,9 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -72,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return utils.AsBytes(data.(*payload.Item).Value), nil
+ return utils.AsBytes(data.(*kvv1.Item).Value), nil
}
return nil, nil
}
@@ -95,14 +95,14 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if value, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = value.(*payload.Item).Value
+ m[keys[i]] = value.(*kvv1.Item).Value
}
}
return m, nil
}
-func (s *Driver) Set(items ...*payload.Item) error {
+func (s *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -128,7 +128,7 @@ func (s *Driver) Set(items ...*payload.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...*payload.Item) error {
+func (s *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
if items[i] == nil {
@@ -145,11 +145,11 @@ func (s *Driver) MExpire(items ...*payload.Item) error {
if err != nil {
return errors.E(op, err)
}
- tmp := pItem.(*payload.Item)
+ tmp := pItem.(*kvv1.Item)
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, &payload.Item{
+ s.heap.Store(items[i].Key, &kvv1.Item{
Key: items[i].Key,
Value: tmp.Value,
Timeout: items[i].Timeout,
@@ -178,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if item, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = item.(*payload.Item).Timeout
+ m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
@@ -216,7 +216,7 @@ func (s *Driver) gc() {
case now := <-ticker.C:
// check every second
s.heap.Range(func(key, value interface{}) bool {
- v := value.(*payload.Item)
+ v := value.(*kvv1.Item)
if v.Timeout == "" {
return true
}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
index 5890367b..0aaa6352 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/kv/drivers/redis/driver.go
@@ -7,9 +7,9 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -138,7 +138,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
//
// Use expiration for `SETEX`-like behavior.
// Zero expiration means the key has no expiration time.
-func (d *Driver) Set(items ...*payload.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -187,7 +187,7 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire https://redis.io/commands/expire
// timeout in RFC3339
-func (d *Driver) MExpire(items ...*payload.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_mexpire")
now := time.Now()
for _, item := range items {
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index abcd7f47..7841f9a2 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,6 +1,6 @@
package kv
-import "github.com/spiral/roadrunner/v2/plugins/kv/payload"
+import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
// Storage represents single abstract storage.
type Storage interface {
@@ -16,10 +16,10 @@ type Storage interface {
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
- Set(items ...*payload.Item) error
+ Set(items ...*kvv1.Item) error
// MExpire sets the TTL for multiply keys
- MExpire(items ...*payload.Item) error
+ MExpire(items ...*kvv1.Item) error
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
diff --git a/plugins/kv/payload.proto b/plugins/kv/payload.proto
deleted file mode 100644
index 66a22ee4..00000000
--- a/plugins/kv/payload.proto
+++ /dev/null
@@ -1,17 +0,0 @@
-syntax = "proto3";
-
-package kv.v1;
-option go_package = "./payload";
-
-message Payload {
- // could be an enum in the future
- string storage = 1;
- repeated Item items = 2;
-}
-
-message Item {
- string key = 1;
- string value = 2;
- // RFC 3339
- string timeout = 3;
-}
diff --git a/plugins/kv/payload/payload.pb.go b/plugins/kv/payload/payload.pb.go
deleted file mode 100644
index 60ad9f13..00000000
--- a/plugins/kv/payload/payload.pb.go
+++ /dev/null
@@ -1,234 +0,0 @@
-// Code generated by protoc-gen-go. DO NOT EDIT.
-// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.16.0
-// source: payload.proto
-
-package payload
-
-import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
- reflect "reflect"
- sync "sync"
-)
-
-const (
- // Verify that this generated code is sufficiently up-to-date.
- _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
- // Verify that runtime/protoimpl is sufficiently up-to-date.
- _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
-)
-
-type Payload struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- // could be an enum in the future
- Storage string `protobuf:"bytes,1,opt,name=storage,proto3" json:"storage,omitempty"`
- Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"`
-}
-
-func (x *Payload) Reset() {
- *x = Payload{}
- if protoimpl.UnsafeEnabled {
- mi := &file_payload_proto_msgTypes[0]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *Payload) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*Payload) ProtoMessage() {}
-
-func (x *Payload) ProtoReflect() protoreflect.Message {
- mi := &file_payload_proto_msgTypes[0]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use Payload.ProtoReflect.Descriptor instead.
-func (*Payload) Descriptor() ([]byte, []int) {
- return file_payload_proto_rawDescGZIP(), []int{0}
-}
-
-func (x *Payload) GetStorage() string {
- if x != nil {
- return x.Storage
- }
- return ""
-}
-
-func (x *Payload) GetItems() []*Item {
- if x != nil {
- return x.Items
- }
- return nil
-}
-
-type Item struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
- Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
- Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"`
-}
-
-func (x *Item) Reset() {
- *x = Item{}
- if protoimpl.UnsafeEnabled {
- mi := &file_payload_proto_msgTypes[1]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *Item) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*Item) ProtoMessage() {}
-
-func (x *Item) ProtoReflect() protoreflect.Message {
- mi := &file_payload_proto_msgTypes[1]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use Item.ProtoReflect.Descriptor instead.
-func (*Item) Descriptor() ([]byte, []int) {
- return file_payload_proto_rawDescGZIP(), []int{1}
-}
-
-func (x *Item) GetKey() string {
- if x != nil {
- return x.Key
- }
- return ""
-}
-
-func (x *Item) GetValue() string {
- if x != nil {
- return x.Value
- }
- return ""
-}
-
-func (x *Item) GetTimeout() string {
- if x != nil {
- return x.Timeout
- }
- return ""
-}
-
-var File_payload_proto protoreflect.FileDescriptor
-
-var file_payload_proto_rawDesc = []byte{
- 0x0a, 0x0d, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
- 0x02, 0x6b, 0x76, 0x22, 0x43, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18,
- 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d,
- 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x6b, 0x76, 0x2e, 0x49, 0x74, 0x65,
- 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d,
- 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
- 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
- 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65,
- 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f,
- 0x75, 0x74, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x62,
- 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
-}
-
-var (
- file_payload_proto_rawDescOnce sync.Once
- file_payload_proto_rawDescData = file_payload_proto_rawDesc
-)
-
-func file_payload_proto_rawDescGZIP() []byte {
- file_payload_proto_rawDescOnce.Do(func() {
- file_payload_proto_rawDescData = protoimpl.X.CompressGZIP(file_payload_proto_rawDescData)
- })
- return file_payload_proto_rawDescData
-}
-
-var file_payload_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
-var file_payload_proto_goTypes = []interface{}{
- (*Payload)(nil), // 0: kv.Payload
- (*Item)(nil), // 1: kv.Item
-}
-var file_payload_proto_depIdxs = []int32{
- 1, // 0: kv.Payload.items:type_name -> kv.Item
- 1, // [1:1] is the sub-list for method output_type
- 1, // [1:1] is the sub-list for method input_type
- 1, // [1:1] is the sub-list for extension type_name
- 1, // [1:1] is the sub-list for extension extendee
- 0, // [0:1] is the sub-list for field type_name
-}
-
-func init() { file_payload_proto_init() }
-func file_payload_proto_init() {
- if File_payload_proto != nil {
- return
- }
- if !protoimpl.UnsafeEnabled {
- file_payload_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Payload); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- file_payload_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Item); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- }
- type x struct{}
- out := protoimpl.TypeBuilder{
- File: protoimpl.DescBuilder{
- GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
- RawDescriptor: file_payload_proto_rawDesc,
- NumEnums: 0,
- NumMessages: 2,
- NumExtensions: 0,
- NumServices: 0,
- },
- GoTypes: file_payload_proto_goTypes,
- DependencyIndexes: file_payload_proto_depIdxs,
- MessageInfos: file_payload_proto_msgTypes,
- }.Build()
- File_payload_proto = out.File
- file_payload_proto_rawDesc = nil
- file_payload_proto_goTypes = nil
- file_payload_proto_depIdxs = nil
-}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 009a5c56..a9efe0c4 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,7 +2,7 @@ package kv
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -17,7 +17,7 @@ type rpc struct {
}
// Has accept []*payload.Payload proto payload with Storage and Item
-func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error {
+func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
const op = errors.Op("rpc_has")
if in.Storage == "" {
@@ -46,7 +46,7 @@ func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error {
}
// Set accept proto payload with Storage and Item
-func (r *rpc) Set(in *payload.Payload, ok *bool) error {
+func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_set")
if st, exists := r.storages[in.GetStorage()]; exists {
@@ -65,7 +65,7 @@ func (r *rpc) Set(in *payload.Payload, ok *bool) error {
}
// MGet accept proto payload with Storage and Item
-func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error {
+func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_mget")
keys := make([]string, 0, len(in.GetItems()))
@@ -89,7 +89,7 @@ func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error {
}
// MExpire accept proto payload with Storage and Item
-func (r *rpc) MExpire(in *payload.Payload, ok *bool) error {
+func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_mexpire")
if st, exists := r.storages[in.GetStorage()]; exists {
@@ -108,7 +108,7 @@ func (r *rpc) MExpire(in *payload.Payload, ok *bool) error {
}
// TTL accept proto payload with Storage and Item
-func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error {
+func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_ttl")
keys := make([]string, 0, len(in.GetItems()))
@@ -131,7 +131,7 @@ func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error {
}
// Delete accept proto payload with Storage and Item
-func (r *rpc) Delete(in *payload.Payload, ok *bool) error {
+func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rcp_delete")
keys := make([]string, 0, len(in.GetItems()))