summaryrefslogtreecommitdiff
path: root/plugins/kv
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv')
-rw-r--r--plugins/kv/config.go5
-rw-r--r--plugins/kv/doc/kv.drawio1
-rw-r--r--plugins/kv/drivers/boltdb/config.go (renamed from plugins/kv/boltdb/config.go)0
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go (renamed from plugins/kv/boltdb/plugin.go)66
-rw-r--r--plugins/kv/drivers/boltdb/plugin_unit_test.go (renamed from plugins/kv/boltdb/plugin_unit_test.go)0
-rw-r--r--plugins/kv/drivers/memcached/config.go (renamed from plugins/kv/memcached/config.go)0
-rw-r--r--plugins/kv/drivers/memcached/plugin.go (renamed from plugins/kv/memcached/plugin.go)11
-rw-r--r--plugins/kv/drivers/memcached/plugin_unit_test.go (renamed from plugins/kv/memcached/plugin_unit_test.go)0
-rw-r--r--plugins/kv/drivers/memory/config.go (renamed from plugins/kv/memory/config.go)0
-rw-r--r--plugins/kv/drivers/memory/plugin.go (renamed from plugins/kv/memory/plugin.go)9
-rw-r--r--plugins/kv/drivers/memory/plugin_unit_test.go (renamed from plugins/kv/memory/plugin_unit_test.go)0
-rw-r--r--plugins/kv/interface.go4
-rw-r--r--plugins/kv/payload/generated/Item.go67
-rw-r--r--plugins/kv/payload/generated/Payload.go71
-rw-r--r--plugins/kv/payload/payload.fbs14
-rw-r--r--plugins/kv/rpc.go198
-rw-r--r--plugins/kv/storage.go176
17 files changed, 500 insertions, 122 deletions
diff --git a/plugins/kv/config.go b/plugins/kv/config.go
new file mode 100644
index 00000000..9ecae644
--- /dev/null
+++ b/plugins/kv/config.go
@@ -0,0 +1,5 @@
+package kv
+
+type Config struct {
+ Data map[string]interface{} `mapstructure:"kv"`
+}
diff --git a/plugins/kv/doc/kv.drawio b/plugins/kv/doc/kv.drawio
new file mode 100644
index 00000000..43afcd2e
--- /dev/null
+++ b/plugins/kv/doc/kv.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-04-21T18:25:55.980Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.114 Electron/12.0.4 Safari/537.36" etag="AVybp6E5fJIGIqD54uYB" version="14.5.1" type="device"><diagram id="2us8W0xnLog_cmX3fgYy" name="Page-1">7VldU6MwFP01fdQBAhQfW4q6M1ad1pnVxwhZiJuSbkg/8NdvKEmBxtY6tWJnfCr35IPknHOTO7QD/MnyisFpMqQRIh3LiJYdMOhYltn1DPFTILlEDM8rkZjhSGIVMMavSHWU6AxHKGt05JQSjqdNMKRpikLewCBjdNHs9oeS5lunMEYaMA4h0dHfOOJJidquV2u4RjhO5KsBMOTKJ1D1lkCWwIguahAIOsBnlPLyabL0ESnoU8SU4y63tK4XwFDK9xkwz4eLkN+mLyF4DqK//vwS/ztz5dp4rnaMIkGADCnjCY1pCklQoX1GZ2mEilkNEVV9biidCtAU4AviPJdqwhmnAkr4hMhWtMT8sfb8VEx17shosJQzr4JcBSln+WM9qI0qwmrYKlLjyv0Vm9pKm4QyOmMh2sGV8h9kMeI7+tlrcUVeIDpBYj1iHEMEcjxvrgNKf8brfpWC4kGK+AFBu6ciaAvCOG0K452KMCeUaaBNQU27TUUrFZ9qLV+TagcwLofeUyzebBmyULDVdS/LBBcYzSlKJ8hRG7qtl3GAlOXb5pDM5BbGD3ej3lWgKdzUb5FgjsZTuKJsIUqgplZyVsQ4Wu5mXmdUDgBOkxlbMbOoqhFTVRhJrRBxje0iHGR7S+NKT4M06hW1l4hCArMMhxsePvoJ8+7JsYX1GqvOG6Qq7EC7m90NUTfFKtNQs7s20bq2VBM5X5w3b2j/c6ntuqz2sOZFm5ca0LK7f3fzMOi3fhA6dtPqTusHobqzfsy/h/ntUzC/rZn/1+3ZMBjejZ6+nf9B+/43NVJ+/L+NLOcU/O9o/hfm93v+dTD4dv5vvxA2T+bb2Sf6+FB/bimITfedivjIheyF5vzRvd+65z3v23lerw9HKMYZRyzT6BIb501OIMFxKp5DQQBiAijowSEkPdkwwVFU5gnK8Ct8Xk1VkD0tlF/txul3nEExl0iNrMyST6K7e/E+3d032LaOxrZ+IJ8zdp5DQajlkoLcZyaeYr4moEb/13+ccK09/WkfjbGuxljrWawfbkdMYxFW/yiVp2P1zxwI/gM=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/kv/boltdb/config.go b/plugins/kv/drivers/boltdb/config.go
index ebe73c25..ebe73c25 100644
--- a/plugins/kv/boltdb/config.go
+++ b/plugins/kv/drivers/boltdb/config.go
diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index ffcbc85a..31194af6 100644
--- a/plugins/kv/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -18,15 +18,16 @@ import (
const PluginName = "boltdb"
-// BoltDB K/V storage.
+// Plugin BoltDB K/V storage.
type Plugin struct {
// db instance
DB *bolt.DB
// name should be UTF-8
bucket []byte
- // config for RR integration
- cfg *Config
+ // boltdb configuration
+ cfg *Config
+ cfgPlugin config.Configurer
// logger
log logger.Logger
@@ -41,26 +42,42 @@ type Plugin struct {
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- const op = errors.Op("boltdb_plugin_init")
+ s.log = log
+ s.cfgPlugin = cfg
+ return nil
+}
+
+// Serve is noop here
+func (s *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ return errCh
+}
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
+func (s *Plugin) Stop() error {
+ const op = errors.Op("boltdb_plugin_stop")
+ if s.DB != nil {
+ err := s.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
}
+ return nil
+}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+func (s *Plugin) Configure(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_configure")
+
+ err := s.cfgPlugin.UnmarshalKey(key, &s.cfg)
if err != nil {
- return errors.E(op, errors.Disabled, err)
+ return nil, errors.E(op, err)
}
// add default values
s.cfg.InitDefaults()
- // set the logger
- s.log = log
-
db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil)
if err != nil {
- return errors.E(op, err)
+ return nil, errors.E(op, err)
}
// create bucket if it does not exist
@@ -75,7 +92,7 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
})
if err != nil {
- return errors.E(op, err)
+ return nil, errors.E(op, err)
}
s.DB = db
@@ -84,24 +101,10 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
s.timeout = time.Duration(s.cfg.Interval) * time.Second
s.gc = &sync.Map{}
- return nil
-}
-
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
- // start the TTL gc
+ // start the GC phase
go s.gcPhase()
- return errCh
-}
-
-func (s *Plugin) Stop() error {
- const op = errors.Op("boltdb_plugin_stop")
- err := s.Close()
- if err != nil {
- return errors.E(op, err)
- }
- return nil
+ return s, nil
}
func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
@@ -397,11 +400,6 @@ func (s *Plugin) Close() error {
return s.DB.Close()
}
-// RPCService returns associated rpc service.
-func (s *Plugin) RPC() interface{} {
- return kv.NewRPCServer(s, s.log)
-}
-
// Name returns plugin name
func (s *Plugin) Name() string {
return PluginName
diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/drivers/boltdb/plugin_unit_test.go
index ad3843e7..ad3843e7 100644
--- a/plugins/kv/boltdb/plugin_unit_test.go
+++ b/plugins/kv/drivers/boltdb/plugin_unit_test.go
diff --git a/plugins/kv/memcached/config.go b/plugins/kv/drivers/memcached/config.go
index 7aad53b6..7aad53b6 100644
--- a/plugins/kv/memcached/config.go
+++ b/plugins/kv/drivers/memcached/config.go
diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index b8392f9e..496042a6 100644
--- a/plugins/kv/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -56,21 +56,20 @@ func (s *Plugin) Serve() chan error {
return errCh
}
-// Memcached has no stop/close or smt similar to close the connection
+// Stop Memcached has no stop/close or smt similar to close the connection
func (s *Plugin) Stop() error {
return nil
}
-// RPCService returns associated rpc service.
-func (s *Plugin) RPC() interface{} {
- return kv.NewRPCServer(s, s.log)
-}
-
// Name returns plugin user-friendly name
func (s *Plugin) Name() string {
return PluginName
}
+func (s *Plugin) Configure(key string) (kv.Storage, error) {
+ return s, nil
+}
+
// Has checks the key for existence
func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("memcached_plugin_has")
diff --git a/plugins/kv/memcached/plugin_unit_test.go b/plugins/kv/drivers/memcached/plugin_unit_test.go
index 31423627..31423627 100644
--- a/plugins/kv/memcached/plugin_unit_test.go
+++ b/plugins/kv/drivers/memcached/plugin_unit_test.go
diff --git a/plugins/kv/memory/config.go b/plugins/kv/drivers/memory/config.go
index e51d09c5..e51d09c5 100644
--- a/plugins/kv/memory/config.go
+++ b/plugins/kv/drivers/memory/config.go
diff --git a/plugins/kv/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go
index 4201a1c0..73527b97 100644
--- a/plugins/kv/memory/plugin.go
+++ b/plugins/kv/drivers/memory/plugin.go
@@ -57,6 +57,10 @@ func (s *Plugin) Stop() error {
return nil
}
+func (s *Plugin) Configure(key string) (kv.Storage, error) {
+ return s, nil
+}
+
func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("in_memory_plugin_has")
if keys == nil {
@@ -220,11 +224,6 @@ func (s *Plugin) Close() error {
return nil
}
-// RPCService returns associated rpc service.
-func (s *Plugin) RPC() interface{} {
- return kv.NewRPCServer(s, s.log)
-}
-
// Name returns plugin user-friendly name
func (s *Plugin) Name() string {
return PluginName
diff --git a/plugins/kv/memory/plugin_unit_test.go b/plugins/kv/drivers/memory/plugin_unit_test.go
index 1965a696..1965a696 100644
--- a/plugins/kv/memory/plugin_unit_test.go
+++ b/plugins/kv/drivers/memory/plugin_unit_test.go
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index c1367cdf..6c2a66f2 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -38,4 +38,8 @@ type Storage interface {
// Close closes the storage and underlying resources.
Close() error
+
+ // Configure used to configure storage
+ // key - yaml config key, for example kv.boltdb-north
+ Configure(key string) (Storage, error)
}
diff --git a/plugins/kv/payload/generated/Item.go b/plugins/kv/payload/generated/Item.go
new file mode 100644
index 00000000..61bd6024
--- /dev/null
+++ b/plugins/kv/payload/generated/Item.go
@@ -0,0 +1,67 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package generated
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Item struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsItem(buf []byte, offset flatbuffers.UOffsetT) *Item {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Item{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func (rcv *Item) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Item) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Item) Key() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Item) Value() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Item) Timeout() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func ItemStart(builder *flatbuffers.Builder) {
+ builder.StartObject(3)
+}
+func ItemAddKey(builder *flatbuffers.Builder, Key flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Key), 0)
+}
+func ItemAddValue(builder *flatbuffers.Builder, Value flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Value), 0)
+}
+func ItemAddTimeout(builder *flatbuffers.Builder, Timeout flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(Timeout), 0)
+}
+func ItemEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
diff --git a/plugins/kv/payload/generated/Payload.go b/plugins/kv/payload/generated/Payload.go
new file mode 100644
index 00000000..a2c6cfdb
--- /dev/null
+++ b/plugins/kv/payload/generated/Payload.go
@@ -0,0 +1,71 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package generated
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Payload struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsPayload(buf []byte, offset flatbuffers.UOffsetT) *Payload {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Payload{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func (rcv *Payload) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Payload) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Payload) Storage() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Payload) Items(obj *Item, j int) bool {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ x := rcv._tab.Vector(o)
+ x += flatbuffers.UOffsetT(j) * 4
+ x = rcv._tab.Indirect(x)
+ obj.Init(rcv._tab.Bytes, x)
+ return true
+ }
+ return false
+}
+
+func (rcv *Payload) ItemsLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func PayloadStart(builder *flatbuffers.Builder) {
+ builder.StartObject(2)
+}
+func PayloadAddStorage(builder *flatbuffers.Builder, Storage flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Storage), 0)
+}
+func PayloadAddItems(builder *flatbuffers.Builder, Items flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Items), 0)
+}
+func PayloadStartItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func PayloadEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
diff --git a/plugins/kv/payload/payload.fbs b/plugins/kv/payload/payload.fbs
new file mode 100644
index 00000000..7e02c1a0
--- /dev/null
+++ b/plugins/kv/payload/payload.fbs
@@ -0,0 +1,14 @@
+namespace generated;
+
+table Payload {
+ Storage:string;
+ Items:[Item];
+}
+
+table Item {
+ Key:string;
+ Value:string;
+ Timeout:string;
+}
+
+root_type Payload;
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 751f0d12..69b91981 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -1,110 +1,154 @@
package kv
import (
+ "unsafe"
+
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
// Wrapper for the plugin
-type RPCServer struct {
+type rpc struct {
+ // all available storages
+ storages map[string]Storage
// svc is a plugin implementing Storage interface
- svc Storage
+ srv *Plugin
// Logger
log logger.Logger
}
-// NewRPCServer construct RPC server for the particular plugin
-func NewRPCServer(srv Storage, log logger.Logger) *RPCServer {
- return &RPCServer{
- svc: srv,
- log: log,
- }
-}
-
-// data Data
-func (r *RPCServer) Has(in []string, res *map[string]bool) error {
- const op = errors.Op("rpc server Has")
- ret, err := r.svc.Has(in...)
- if err != nil {
- return errors.E(op, err)
- }
-
- // update the value in the pointer
- *res = ret
- return nil
-}
+// Has accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Has(in []byte, res *map[string]bool) error {
+ const op = errors.Op("rpc_has")
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
-// in SetData
-func (r *RPCServer) Set(in []Item, ok *bool) error {
- const op = errors.Op("rpc server Set")
+ keys := make([]string, 0, l)
- err := r.svc.Set(in...)
- if err != nil {
- return errors.E(op, err)
+ tmpItem := &generated.Item{}
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, strConvert(tmpItem.Key()))
}
- *ok = true
- return nil
-}
+ if st, ok := r.storages[strConvert(dataRoot.Storage())]; ok {
+ ret, err := st.Has(keys...)
+ if err != nil {
+ return err
+ }
-// in Data
-func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error {
- const op = errors.Op("rpc server MGet")
- ret, err := r.svc.MGet(in...)
- if err != nil {
- return errors.E(op, err)
+ // update the value in the pointer
+ // save the result
+ *res = ret
+ return nil
}
- // update return value
- *res = ret
- return nil
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in Data
-func (r *RPCServer) MExpire(in []Item, ok *bool) error {
- const op = errors.Op("rpc server MExpire")
+//Set accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Set(in []byte, ok *bool) error {
+ const op = errors.Op("rpc_set")
- err := r.svc.MExpire(in...)
- if err != nil {
- return errors.E(op, err)
- }
+ dataRoot := generated.GetRootAsPayload(in, 0)
- *ok = true
- return nil
-}
+ l := dataRoot.ItemsLength()
+ items := make([]Item, 0, dataRoot.ItemsLength())
+ tmpItem := &generated.Item{}
-// in Data
-func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error {
- const op = errors.Op("rpc server TTL")
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
- ret, err := r.svc.TTL(in...)
- if err != nil {
- return errors.E(op, err)
+ itc := Item{
+ Key: string(tmpItem.Key()),
+ Value: string(tmpItem.Value()),
+ TTL: string(tmpItem.Timeout()),
+ }
+
+ items = append(items, itc)
}
- *res = ret
- return nil
-}
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ err := st.Set(items...)
+ if err != nil {
+ return err
+ }
-// in Data
-func (r *RPCServer) Delete(in []string, ok *bool) error {
- const op = errors.Op("rpc server Delete")
- err := r.svc.Delete(in...)
- if err != nil {
- return errors.E(op, err)
+ // save the result
+ *ok = true
+ return nil
}
- *ok = true
- return nil
-}
-// in string, storages
-func (r *RPCServer) Close(storage string, ok *bool) error {
- const op = errors.Op("rpc server Close")
- err := r.svc.Close()
- if err != nil {
- return errors.E(op, err)
- }
- *ok = true
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+}
- return nil
+// MGet accept []byte flatbuffers payload with Storage and Item
+//func (r *rpc) MGet(in []string, res *map[string]interface{}) error {
+// const op = errors.Op("rpc server MGet")
+// ret, err := r.svc.MGet(in...)
+// if err != nil {
+// return errors.E(op, err)
+// }
+//
+// // update return value
+// *res = ret
+// return nil
+//}
+
+//// MExpire accept []byte flatbuffers payload with Storage and Item
+//func (r *rpc) MExpire(in []Item, ok *bool) error {
+// const op = errors.Op("rpc server MExpire")
+//
+// err := r.svc.MExpire(in...)
+// if err != nil {
+// return errors.E(op, err)
+// }
+//
+// *ok = true
+// return nil
+//}
+//
+//// TTL accept []byte flatbuffers payload with Storage and Item
+//func (r *rpc) TTL(in []string, res *map[string]interface{}) error {
+// const op = errors.Op("rpc server TTL")
+//
+// ret, err := r.svc.TTL(in...)
+// if err != nil {
+// return errors.E(op, err)
+// }
+//
+// *res = ret
+// return nil
+//}
+//
+//// Delete accept []byte flatbuffers payload with Storage and Item
+//func (r *rpc) Delete(in []string, ok *bool) error {
+// const op = errors.Op("rpc server Delete")
+// err := r.svc.Delete(in...)
+// if err != nil {
+// return errors.E(op, err)
+// }
+// *ok = true
+// return nil
+//}
+//
+//// Close closes the storage connection
+//func (r *rpc) Close(storage string, ok *bool) error {
+// const op = errors.Op("rpc server Close")
+// err := r.svc.Close()
+// if err != nil {
+// return errors.E(op, err)
+// }
+// *ok = true
+//
+// return nil
+//}
+
+func strConvert(s []byte) string {
+ return *(*string)(unsafe.Pointer(&s))
}
diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go
new file mode 100644
index 00000000..81ca2d91
--- /dev/null
+++ b/plugins/kv/storage.go
@@ -0,0 +1,176 @@
+package kv
+
+import (
+ "fmt"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName string = "kv"
+
+const (
+ // driver is the mandatory field which should present in every storage
+ driver string = "driver"
+
+ memcached string = "memcached"
+ boltdb string = "boltdb"
+ redis string = "redis"
+ memory string = "memory"
+)
+
+// Plugin for the unified storage
+type Plugin struct {
+ log logger.Logger
+ // drivers contains general storage drivers, such as boltdb, memory, memcached, redis.
+ drivers map[string]Storage
+ // storages contains user-defined storages, such as boltdb-north, memcached-us and so on.
+ storages map[string]Storage
+ // KV configuration
+ cfg Config
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("kv_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg.Data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ p.drivers = make(map[string]Storage, 5)
+ p.storages = make(map[string]Storage, 5)
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ const op = errors.Op("kv_plugin_serve")
+ // key - storage name in the config
+ // value - storage
+ /*
+ For example we can have here 2 storages (but they are not pre-configured)
+ for the boltdb and memcached
+ We should provide here the actual configs for the all requested storages
+ kv:
+ default:
+ driver: memory
+
+ boltdb-south:
+ driver: boltdb
+ dir: "tests/rr-bolt"
+ file: "rr.db"
+ bucket: "rr"
+ permissions: 777
+ ttl: 40s
+
+ boltdb-north:
+ driver: boltdb
+ dir: "tests/rr-bolt"
+ file: "rr.db"
+ bucket: "rr"
+ permissions: 777
+ ttl: 40s
+
+ memcached:
+ driver: memcached
+ addr: [ "localhost:11211" ]
+
+
+ For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
+ when user requests for example boltdb-south, we should provide that particular preconfigured storage
+ */
+ for k, v := range p.cfg.Data {
+ if _, ok := v.(map[string]interface{})[driver]; !ok {
+ errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
+ }
+
+ configKey := fmt.Sprintf("%s.%s", PluginName, k)
+ // at this point we know, that driver field present in the cofiguration
+ switch v.(map[string]interface{})[driver] {
+ case memcached:
+ if _, ok := p.drivers[memcached]; !ok {
+ continue
+ }
+ storage, err := p.drivers[memcached].Configure(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+
+ case boltdb:
+ if _, ok := p.drivers[boltdb]; !ok {
+ continue
+ }
+ storage, err := p.drivers[boltdb].Configure(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ case memory:
+ if _, ok := p.drivers[memory]; !ok {
+ continue
+ }
+ storage, err := p.drivers[memory].Configure(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ case redis:
+ if _, ok := p.drivers[redis]; !ok {
+ continue
+ }
+ storage, err := p.drivers[redis].Configure(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ default:
+ errCh <- errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))
+ }
+ }
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+// Collects will get all plugins which implement Storage interface
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.GetAllStorageDrivers,
+ }
+}
+
+func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage Storage) {
+ // save the storage driver
+ p.drivers[name.Name()] = storage
+}
+
+// RPC returns associated rpc service.
+func (p *Plugin) RPC() interface{} {
+ return &rpc{srv: p, log: p.log, storages: p.storages}
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}