diff options
author | Valery Piashchynski <[email protected]> | 2021-06-08 18:03:48 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-08 18:03:48 +0300 |
commit | 47c40407a7ca5f1391f4d3d504d0def166eac4e9 (patch) | |
tree | 6606bdcdb258cd1138f919ea7fc9a68a40f6bc40 /plugins/kv | |
parent | 49ce25e80ba99ac91bce7ea2b9b632de53e07c0d (diff) |
- Switch from the flatbuffers to the protobuf
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/kv')
-rw-r--r-- | plugins/kv/drivers/boltdb/driver.go | 17 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/driver.go | 18 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/driver.go | 43 | ||||
-rw-r--r-- | plugins/kv/drivers/redis/driver.go | 20 | ||||
-rw-r--r-- | plugins/kv/interface.go | 14 | ||||
-rw-r--r-- | plugins/kv/payload.proto | 17 | ||||
-rw-r--r-- | plugins/kv/payload/generated/Item.go | 67 | ||||
-rw-r--r-- | plugins/kv/payload/generated/Payload.go | 71 | ||||
-rw-r--r-- | plugins/kv/payload/payload.fbs | 14 | ||||
-rw-r--r-- | plugins/kv/payload/payload.pb.go | 234 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 153 |
12 files changed, 356 insertions, 314 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 0f647cb1..af107dff 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -12,6 +12,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" bolt "go.etcd.io/bbolt" @@ -213,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } // Set puts the K/V to the bolt -func (d *Driver) Set(items ...kv.Item) error { +func (d *Driver) Set(items ...*payload.Item) error { const op = errors.Op("boltdb_driver_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -259,14 +260,14 @@ func (d *Driver) Set(items ...kv.Item) error { // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check // we do not need mutex here, since we use sync.Map - if items[i].TTL != "" { + if items[i].Timeout != "" { // check correctness of provided TTL - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } // Store key TTL in the separate map - d.gc.Store(items[i].Key, items[i].TTL) + d.gc.Store(items[i].Key, items[i].Timeout) } buf.Reset() @@ -323,20 +324,20 @@ func (d *Driver) Delete(keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (d *Driver) MExpire(items ...kv.Item) error { +func (d *Driver) MExpire(items ...*payload.Item) error { const op = errors.Op("boltdb_driver_mexpire") for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } // verify provided TTL - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } - d.gc.Store(items[i].Key, items[i].TTL) + d.gc.Store(items[i].Key, items[i].Timeout) } return nil } diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 02281ed5..9c4689c4 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -8,6 +8,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -134,14 +135,14 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) Set(items ...kv.Item) error { +func (d *Driver) Set(items ...*payload.Item) error { const op = errors.Op("memcached_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) } for i := range items { - if items[i] == EmptyItem { + if items[i] == nil { return errors.E(op, errors.EmptyItem) } @@ -154,9 +155,9 @@ func (d *Driver) Set(items ...kv.Item) error { } // add additional TTL in case of TTL isn't empty - if items[i].TTL != "" { + if items[i].Timeout != "" { // verify the TTL - t, err := time.Parse(time.RFC3339, items[i].TTL) + t, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return err } @@ -175,15 +176,18 @@ func (d *Driver) Set(items ...kv.Item) error { // MExpire Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) MExpire(items ...kv.Item) error { +func (d *Driver) MExpire(items ...*payload.Item) error { const op = errors.Op("memcached_plugin_mexpire") for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } // verify provided TTL - t, err := time.Parse(time.RFC3339, items[i].TTL) + t, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index cde84f42..3997e0d4 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -9,8 +9,6 @@ import ( const PluginName = "memcached" -var EmptyItem = kv.Item{} - type Plugin struct { // config plugin cfgPlugin config.Configurer diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go index c2494ee7..fae2c831 100644 --- a/plugins/kv/drivers/memory/driver.go +++ b/plugins/kv/drivers/memory/driver.go @@ -8,6 +8,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -71,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) { if data, exist := s.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function - return utils.AsBytes(data.(kv.Item).Value), nil + return utils.AsBytes(data.(*payload.Item).Value), nil } return nil, nil } @@ -94,24 +95,27 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { for i := range keys { if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(kv.Item).Value + m[keys[i]] = value.(*payload.Item).Value } } return m, nil } -func (s *Driver) Set(items ...kv.Item) error { +func (s *Driver) Set(items ...*payload.Item) error { const op = errors.Op("in_memory_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) } for i := range items { + if items[i] == nil { + continue + } // TTL is set - if items[i].TTL != "" { + if items[i].Timeout != "" { // check the TTL in the item - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return err } @@ -124,28 +128,31 @@ func (s *Driver) Set(items ...kv.Item) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...kv.Item) error { +func (s *Driver) MExpire(items ...*payload.Item) error { const op = errors.Op("in_memory_plugin_mexpire") for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } // if key exist, overwrite it value - if pItem, ok := s.heap.Load(items[i].Key); ok { + if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok { // check that time is correct - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } - tmp := pItem.(kv.Item) + tmp := pItem.(*payload.Item) // guess that t is in the future // in memory is just FOR TESTING PURPOSES // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, kv.Item{ - Key: items[i].Key, - Value: tmp.Value, - TTL: items[i].TTL, + s.heap.Store(items[i].Key, &payload.Item{ + Key: items[i].Key, + Value: tmp.Value, + Timeout: items[i].Timeout, }) } } @@ -171,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { for i := range keys { if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(kv.Item).TTL + m[keys[i]] = item.(*payload.Item).Timeout } } return m, nil @@ -209,12 +216,12 @@ func (s *Driver) gc() { case now := <-ticker.C: // check every second s.heap.Range(func(key, value interface{}) bool { - v := value.(kv.Item) - if v.TTL == "" { + v := value.(*payload.Item) + if v.Timeout == "" { return true } - t, err := time.Parse(time.RFC3339, v.TTL) + t, err := time.Parse(time.RFC3339, v.Timeout) if err != nil { return false } diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go index d0b541b2..5890367b 100644 --- a/plugins/kv/drivers/redis/driver.go +++ b/plugins/kv/drivers/redis/driver.go @@ -9,11 +9,10 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" ) -var EmptyItem = kv.Item{} - type Driver struct { universalClient redis.UniversalClient log logger.Logger @@ -139,24 +138,24 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // // Use expiration for `SETEX`-like behavior. // Zero expiration means the key has no expiration time. -func (d *Driver) Set(items ...kv.Item) error { +func (d *Driver) Set(items ...*payload.Item) error { const op = errors.Op("redis_driver_set") if items == nil { return errors.E(op, errors.NoKeys) } now := time.Now() for _, item := range items { - if item == EmptyItem { + if item == nil { return errors.E(op, errors.EmptyKey) } - if item.TTL == "" { + if item.Timeout == "" { err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err() if err != nil { return err } } else { - t, err := time.Parse(time.RFC3339, item.TTL) + t, err := time.Parse(time.RFC3339, item.Timeout) if err != nil { return err } @@ -188,15 +187,18 @@ func (d *Driver) Delete(keys ...string) error { // MExpire https://redis.io/commands/expire // timeout in RFC3339 -func (d *Driver) MExpire(items ...kv.Item) error { +func (d *Driver) MExpire(items ...*payload.Item) error { const op = errors.Op("redis_driver_mexpire") now := time.Now() for _, item := range items { - if item.TTL == "" || strings.TrimSpace(item.Key) == "" { + if item == nil { + continue + } + if item.Timeout == "" || strings.TrimSpace(item.Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } - t, err := time.Parse(time.RFC3339, item.TTL) + t, err := time.Parse(time.RFC3339, item.Timeout) if err != nil { return err } diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 20dbb8b3..abcd7f47 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -1,14 +1,6 @@ package kv -// Item represents general storage item -type Item struct { - // Key of item - Key string - // Value of item - Value string - // live until time provided by TTL in RFC 3339 format - TTL string -} +import "github.com/spiral/roadrunner/v2/plugins/kv/payload" // Storage represents single abstract storage. type Storage interface { @@ -24,10 +16,10 @@ type Storage interface { // Set used to upload item to KV with TTL // 0 value in TTL means no TTL - Set(items ...Item) error + Set(items ...*payload.Item) error // MExpire sets the TTL for multiply keys - MExpire(items ...Item) error + MExpire(items ...*payload.Item) error // TTL return the rest time to live for provided keys // Not supported for the memcached and boltdb diff --git a/plugins/kv/payload.proto b/plugins/kv/payload.proto new file mode 100644 index 00000000..66a22ee4 --- /dev/null +++ b/plugins/kv/payload.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package kv.v1; +option go_package = "./payload"; + +message Payload { + // could be an enum in the future + string storage = 1; + repeated Item items = 2; +} + +message Item { + string key = 1; + string value = 2; + // RFC 3339 + string timeout = 3; +} diff --git a/plugins/kv/payload/generated/Item.go b/plugins/kv/payload/generated/Item.go deleted file mode 100644 index 61bd6024..00000000 --- a/plugins/kv/payload/generated/Item.go +++ /dev/null @@ -1,67 +0,0 @@ -// Code generated by the FlatBuffers compiler. DO NOT EDIT. - -package generated - -import ( - flatbuffers "github.com/google/flatbuffers/go" -) - -type Item struct { - _tab flatbuffers.Table -} - -func GetRootAsItem(buf []byte, offset flatbuffers.UOffsetT) *Item { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &Item{} - x.Init(buf, n+offset) - return x -} - -func (rcv *Item) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *Item) Table() flatbuffers.Table { - return rcv._tab -} - -func (rcv *Item) Key() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Item) Value() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Item) Timeout() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func ItemStart(builder *flatbuffers.Builder) { - builder.StartObject(3) -} -func ItemAddKey(builder *flatbuffers.Builder, Key flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Key), 0) -} -func ItemAddValue(builder *flatbuffers.Builder, Value flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Value), 0) -} -func ItemAddTimeout(builder *flatbuffers.Builder, Timeout flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(Timeout), 0) -} -func ItemEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { - return builder.EndObject() -} diff --git a/plugins/kv/payload/generated/Payload.go b/plugins/kv/payload/generated/Payload.go deleted file mode 100644 index a2c6cfdb..00000000 --- a/plugins/kv/payload/generated/Payload.go +++ /dev/null @@ -1,71 +0,0 @@ -// Code generated by the FlatBuffers compiler. DO NOT EDIT. - -package generated - -import ( - flatbuffers "github.com/google/flatbuffers/go" -) - -type Payload struct { - _tab flatbuffers.Table -} - -func GetRootAsPayload(buf []byte, offset flatbuffers.UOffsetT) *Payload { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &Payload{} - x.Init(buf, n+offset) - return x -} - -func (rcv *Payload) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *Payload) Table() flatbuffers.Table { - return rcv._tab -} - -func (rcv *Payload) Storage() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Payload) Items(obj *Item, j int) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - x := rcv._tab.Vector(o) - x += flatbuffers.UOffsetT(j) * 4 - x = rcv._tab.Indirect(x) - obj.Init(rcv._tab.Bytes, x) - return true - } - return false -} - -func (rcv *Payload) ItemsLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func PayloadStart(builder *flatbuffers.Builder) { - builder.StartObject(2) -} -func PayloadAddStorage(builder *flatbuffers.Builder, Storage flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Storage), 0) -} -func PayloadAddItems(builder *flatbuffers.Builder, Items flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Items), 0) -} -func PayloadStartItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { - return builder.StartVector(4, numElems, 4) -} -func PayloadEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { - return builder.EndObject() -} diff --git a/plugins/kv/payload/payload.fbs b/plugins/kv/payload/payload.fbs deleted file mode 100644 index 7e02c1a0..00000000 --- a/plugins/kv/payload/payload.fbs +++ /dev/null @@ -1,14 +0,0 @@ -namespace generated; - -table Payload { - Storage:string; - Items:[Item]; -} - -table Item { - Key:string; - Value:string; - Timeout:string; -} - -root_type Payload; diff --git a/plugins/kv/payload/payload.pb.go b/plugins/kv/payload/payload.pb.go new file mode 100644 index 00000000..60ad9f13 --- /dev/null +++ b/plugins/kv/payload/payload.pb.go @@ -0,0 +1,234 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.16.0 +// source: payload.proto + +package payload + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Payload struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // could be an enum in the future + Storage string `protobuf:"bytes,1,opt,name=storage,proto3" json:"storage,omitempty"` + Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"` +} + +func (x *Payload) Reset() { + *x = Payload{} + if protoimpl.UnsafeEnabled { + mi := &file_payload_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Payload) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Payload) ProtoMessage() {} + +func (x *Payload) ProtoReflect() protoreflect.Message { + mi := &file_payload_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Payload.ProtoReflect.Descriptor instead. +func (*Payload) Descriptor() ([]byte, []int) { + return file_payload_proto_rawDescGZIP(), []int{0} +} + +func (x *Payload) GetStorage() string { + if x != nil { + return x.Storage + } + return "" +} + +func (x *Payload) GetItems() []*Item { + if x != nil { + return x.Items + } + return nil +} + +type Item struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"` +} + +func (x *Item) Reset() { + *x = Item{} + if protoimpl.UnsafeEnabled { + mi := &file_payload_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Item) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Item) ProtoMessage() {} + +func (x *Item) ProtoReflect() protoreflect.Message { + mi := &file_payload_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Item.ProtoReflect.Descriptor instead. +func (*Item) Descriptor() ([]byte, []int) { + return file_payload_proto_rawDescGZIP(), []int{1} +} + +func (x *Item) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *Item) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +func (x *Item) GetTimeout() string { + if x != nil { + return x.Timeout + } + return "" +} + +var File_payload_proto protoreflect.FileDescriptor + +var file_payload_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x02, 0x6b, 0x76, 0x22, 0x43, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18, + 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x6b, 0x76, 0x2e, 0x49, 0x74, 0x65, + 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, + 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, + 0x75, 0x74, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_payload_proto_rawDescOnce sync.Once + file_payload_proto_rawDescData = file_payload_proto_rawDesc +) + +func file_payload_proto_rawDescGZIP() []byte { + file_payload_proto_rawDescOnce.Do(func() { + file_payload_proto_rawDescData = protoimpl.X.CompressGZIP(file_payload_proto_rawDescData) + }) + return file_payload_proto_rawDescData +} + +var file_payload_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_payload_proto_goTypes = []interface{}{ + (*Payload)(nil), // 0: kv.Payload + (*Item)(nil), // 1: kv.Item +} +var file_payload_proto_depIdxs = []int32{ + 1, // 0: kv.Payload.items:type_name -> kv.Item + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_payload_proto_init() } +func file_payload_proto_init() { + if File_payload_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_payload_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Payload); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_payload_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Item); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_payload_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_payload_proto_goTypes, + DependencyIndexes: file_payload_proto_depIdxs, + MessageInfos: file_payload_proto_msgTypes, + }.Build() + File_payload_proto = out.File + file_payload_proto_rawDesc = nil + file_payload_proto_goTypes = nil + file_payload_proto_depIdxs = nil +} diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 2d4babbe..009a5c56 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,9 +2,8 @@ package kv import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" ) // Wrapper for the plugin @@ -17,23 +16,21 @@ type rpc struct { log logger.Logger } -// Has accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Has(in []byte, res *map[string]bool) error { +// Has accept []*payload.Payload proto payload with Storage and Item +func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error { const op = errors.Op("rpc_has") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) + if in.Storage == "" { + return errors.E(op, errors.Str("no storage provided")) + } - tmpItem := &generated.Item{} - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, utils.AsString(tmpItem.Key())) + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, ok := r.storages[utils.AsString(dataRoot.Storage())]; ok { + if st, ok := r.storages[in.Storage]; ok { ret, err := st.Has(keys...) if err != nil { return err @@ -45,35 +42,15 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// Set accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Set(in []byte, ok *bool) error { +// Set accept proto payload with Storage and Item +func (r *rpc) Set(in *payload.Payload, ok *bool) error { const op = errors.Op("rpc_set") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - - items := make([]Item, 0, dataRoot.ItemsLength()) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - - itc := Item{ - Key: string(tmpItem.Key()), - Value: string(tmpItem.Value()), - TTL: string(tmpItem.Timeout()), - } - - items = append(items, itc) - } - - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { - err := st.Set(items...) + if st, exists := r.storages[in.GetStorage()]; exists { + err := st.Set(in.GetItems()...) if err != nil { return err } @@ -84,26 +61,20 @@ func (r *rpc) Set(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// MGet accept []byte flatbuffers payload with Storage and Item -func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { +// MGet accept proto payload with Storage and Item +func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_mget") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} + keys := make([]string, 0, len(in.GetItems())) - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { ret, err := st.MGet(keys...) if err != nil { return err @@ -114,36 +85,15 @@ func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// MExpire accept []byte flatbuffers payload with Storage and Item -func (r *rpc) MExpire(in []byte, ok *bool) error { +// MExpire accept proto payload with Storage and Item +func (r *rpc) MExpire(in *payload.Payload, ok *bool) error { const op = errors.Op("rpc_mexpire") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - - // when unmarshalling the keys, simultaneously, fill up the slice with items - items := make([]Item, 0, l) - tmpItem := &generated.Item{} - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - - itc := Item{ - Key: string(tmpItem.Key()), - // we set up timeout on the keys, so, value here is redundant - Value: "", - TTL: string(tmpItem.Timeout()), - } - - items = append(items, itc) - } - - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { - err := st.MExpire(items...) + if st, exists := r.storages[in.GetStorage()]; exists { + err := st.MExpire(in.GetItems()...) if err != nil { return errors.E(op, err) } @@ -154,25 +104,19 @@ func (r *rpc) MExpire(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// TTL accept []byte flatbuffers payload with Storage and Item -func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { +// TTL accept proto payload with Storage and Item +func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_ttl") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { ret, err := st.TTL(keys...) if err != nil { return err @@ -183,24 +127,19 @@ func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// Delete accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Delete(in []byte, ok *bool) error { +// Delete accept proto payload with Storage and Item +func (r *rpc) Delete(in *payload.Payload, ok *bool) error { const op = errors.Op("rcp_delete") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { err := st.Delete(keys...) if err != nil { return errors.E(op, err) @@ -212,5 +151,5 @@ func (r *rpc) Delete(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } |