diff options
author | Valery Piashchynski <[email protected]> | 2021-06-08 22:04:28 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-08 22:04:28 +0300 |
commit | cc271dceb13d3929f0382311dfce3dfed2ce04ce (patch) | |
tree | 13c4c3f380d8309b95c9600cc2000d1d5ab87cda /plugins/kv | |
parent | a8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (diff) |
- Add protobuf versioning
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/kv')
-rw-r--r-- | plugins/kv/drivers/boltdb/driver.go | 6 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/driver.go | 6 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/driver.go | 18 | ||||
-rw-r--r-- | plugins/kv/drivers/redis/driver.go | 6 | ||||
-rw-r--r-- | plugins/kv/interface.go | 6 | ||||
-rw-r--r-- | plugins/kv/payload.proto | 17 | ||||
-rw-r--r-- | plugins/kv/payload/payload.pb.go | 234 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 14 |
8 files changed, 28 insertions, 279 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index af107dff..ba873513 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -10,9 +10,9 @@ import ( "time" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" bolt "go.etcd.io/bbolt" @@ -214,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } // Set puts the K/V to the bolt -func (d *Driver) Set(items ...*payload.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("boltdb_driver_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -324,7 +324,7 @@ func (d *Driver) Delete(keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (d *Driver) MExpire(items ...*payload.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("boltdb_driver_mexpire") for i := range items { if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 9c4689c4..8ea515d0 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -6,9 +6,9 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -135,7 +135,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) Set(items ...*payload.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("memcached_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -176,7 +176,7 @@ func (d *Driver) Set(items ...*payload.Item) error { // MExpire Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) MExpire(items ...*payload.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("memcached_plugin_mexpire") for i := range items { if items[i] == nil { diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go index fae2c831..3158adee 100644 --- a/plugins/kv/drivers/memory/driver.go +++ b/plugins/kv/drivers/memory/driver.go @@ -6,9 +6,9 @@ import ( "time" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -72,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) { if data, exist := s.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function - return utils.AsBytes(data.(*payload.Item).Value), nil + return utils.AsBytes(data.(*kvv1.Item).Value), nil } return nil, nil } @@ -95,14 +95,14 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { for i := range keys { if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(*payload.Item).Value + m[keys[i]] = value.(*kvv1.Item).Value } } return m, nil } -func (s *Driver) Set(items ...*payload.Item) error { +func (s *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -128,7 +128,7 @@ func (s *Driver) Set(items ...*payload.Item) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...*payload.Item) error { +func (s *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_mexpire") for i := range items { if items[i] == nil { @@ -145,11 +145,11 @@ func (s *Driver) MExpire(items ...*payload.Item) error { if err != nil { return errors.E(op, err) } - tmp := pItem.(*payload.Item) + tmp := pItem.(*kvv1.Item) // guess that t is in the future // in memory is just FOR TESTING PURPOSES // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, &payload.Item{ + s.heap.Store(items[i].Key, &kvv1.Item{ Key: items[i].Key, Value: tmp.Value, Timeout: items[i].Timeout, @@ -178,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { for i := range keys { if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(*payload.Item).Timeout + m[keys[i]] = item.(*kvv1.Item).Timeout } } return m, nil @@ -216,7 +216,7 @@ func (s *Driver) gc() { case now := <-ticker.C: // check every second s.heap.Range(func(key, value interface{}) bool { - v := value.(*payload.Item) + v := value.(*kvv1.Item) if v.Timeout == "" { return true } diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go index 5890367b..0aaa6352 100644 --- a/plugins/kv/drivers/redis/driver.go +++ b/plugins/kv/drivers/redis/driver.go @@ -7,9 +7,9 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -138,7 +138,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // // Use expiration for `SETEX`-like behavior. // Zero expiration means the key has no expiration time. -func (d *Driver) Set(items ...*payload.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("redis_driver_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -187,7 +187,7 @@ func (d *Driver) Delete(keys ...string) error { // MExpire https://redis.io/commands/expire // timeout in RFC3339 -func (d *Driver) MExpire(items ...*payload.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("redis_driver_mexpire") now := time.Now() for _, item := range items { diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index abcd7f47..7841f9a2 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -1,6 +1,6 @@ package kv -import "github.com/spiral/roadrunner/v2/plugins/kv/payload" +import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" // Storage represents single abstract storage. type Storage interface { @@ -16,10 +16,10 @@ type Storage interface { // Set used to upload item to KV with TTL // 0 value in TTL means no TTL - Set(items ...*payload.Item) error + Set(items ...*kvv1.Item) error // MExpire sets the TTL for multiply keys - MExpire(items ...*payload.Item) error + MExpire(items ...*kvv1.Item) error // TTL return the rest time to live for provided keys // Not supported for the memcached and boltdb diff --git a/plugins/kv/payload.proto b/plugins/kv/payload.proto deleted file mode 100644 index 66a22ee4..00000000 --- a/plugins/kv/payload.proto +++ /dev/null @@ -1,17 +0,0 @@ -syntax = "proto3"; - -package kv.v1; -option go_package = "./payload"; - -message Payload { - // could be an enum in the future - string storage = 1; - repeated Item items = 2; -} - -message Item { - string key = 1; - string value = 2; - // RFC 3339 - string timeout = 3; -} diff --git a/plugins/kv/payload/payload.pb.go b/plugins/kv/payload/payload.pb.go deleted file mode 100644 index 60ad9f13..00000000 --- a/plugins/kv/payload/payload.pb.go +++ /dev/null @@ -1,234 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.26.0 -// protoc v3.16.0 -// source: payload.proto - -package payload - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type Payload struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // could be an enum in the future - Storage string `protobuf:"bytes,1,opt,name=storage,proto3" json:"storage,omitempty"` - Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"` -} - -func (x *Payload) Reset() { - *x = Payload{} - if protoimpl.UnsafeEnabled { - mi := &file_payload_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Payload) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Payload) ProtoMessage() {} - -func (x *Payload) ProtoReflect() protoreflect.Message { - mi := &file_payload_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Payload.ProtoReflect.Descriptor instead. -func (*Payload) Descriptor() ([]byte, []int) { - return file_payload_proto_rawDescGZIP(), []int{0} -} - -func (x *Payload) GetStorage() string { - if x != nil { - return x.Storage - } - return "" -} - -func (x *Payload) GetItems() []*Item { - if x != nil { - return x.Items - } - return nil -} - -type Item struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"` -} - -func (x *Item) Reset() { - *x = Item{} - if protoimpl.UnsafeEnabled { - mi := &file_payload_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Item) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Item) ProtoMessage() {} - -func (x *Item) ProtoReflect() protoreflect.Message { - mi := &file_payload_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Item.ProtoReflect.Descriptor instead. -func (*Item) Descriptor() ([]byte, []int) { - return file_payload_proto_rawDescGZIP(), []int{1} -} - -func (x *Item) GetKey() string { - if x != nil { - return x.Key - } - return "" -} - -func (x *Item) GetValue() string { - if x != nil { - return x.Value - } - return "" -} - -func (x *Item) GetTimeout() string { - if x != nil { - return x.Timeout - } - return "" -} - -var File_payload_proto protoreflect.FileDescriptor - -var file_payload_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x02, 0x6b, 0x76, 0x22, 0x43, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18, - 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, - 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x6b, 0x76, 0x2e, 0x49, 0x74, 0x65, - 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, - 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, - 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, - 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, - 0x75, 0x74, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_payload_proto_rawDescOnce sync.Once - file_payload_proto_rawDescData = file_payload_proto_rawDesc -) - -func file_payload_proto_rawDescGZIP() []byte { - file_payload_proto_rawDescOnce.Do(func() { - file_payload_proto_rawDescData = protoimpl.X.CompressGZIP(file_payload_proto_rawDescData) - }) - return file_payload_proto_rawDescData -} - -var file_payload_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_payload_proto_goTypes = []interface{}{ - (*Payload)(nil), // 0: kv.Payload - (*Item)(nil), // 1: kv.Item -} -var file_payload_proto_depIdxs = []int32{ - 1, // 0: kv.Payload.items:type_name -> kv.Item - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { file_payload_proto_init() } -func file_payload_proto_init() { - if File_payload_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_payload_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Payload); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_payload_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Item); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_payload_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_payload_proto_goTypes, - DependencyIndexes: file_payload_proto_depIdxs, - MessageInfos: file_payload_proto_msgTypes, - }.Build() - File_payload_proto = out.File - file_payload_proto_rawDesc = nil - file_payload_proto_goTypes = nil - file_payload_proto_depIdxs = nil -} diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 009a5c56..a9efe0c4 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,7 +2,7 @@ package kv import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -17,7 +17,7 @@ type rpc struct { } // Has accept []*payload.Payload proto payload with Storage and Item -func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error { +func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { const op = errors.Op("rpc_has") if in.Storage == "" { @@ -46,7 +46,7 @@ func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error { } // Set accept proto payload with Storage and Item -func (r *rpc) Set(in *payload.Payload, ok *bool) error { +func (r *rpc) Set(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rpc_set") if st, exists := r.storages[in.GetStorage()]; exists { @@ -65,7 +65,7 @@ func (r *rpc) Set(in *payload.Payload, ok *bool) error { } // MGet accept proto payload with Storage and Item -func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error { +func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_mget") keys := make([]string, 0, len(in.GetItems())) @@ -89,7 +89,7 @@ func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error { } // MExpire accept proto payload with Storage and Item -func (r *rpc) MExpire(in *payload.Payload, ok *bool) error { +func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rpc_mexpire") if st, exists := r.storages[in.GetStorage()]; exists { @@ -108,7 +108,7 @@ func (r *rpc) MExpire(in *payload.Payload, ok *bool) error { } // TTL accept proto payload with Storage and Item -func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error { +func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_ttl") keys := make([]string, 0, len(in.GetItems())) @@ -131,7 +131,7 @@ func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error { } // Delete accept proto payload with Storage and Item -func (r *rpc) Delete(in *payload.Payload, ok *bool) error { +func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rcp_delete") keys := make([]string, 0, len(in.GetItems())) |