diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/kv/doc/kv.drawio | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin.go | 113 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin_unit_test.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin.go | 49 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/plugin.go | 30 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 209 | ||||
-rw-r--r-- | plugins/kv/storage.go | 1 |
7 files changed, 252 insertions, 154 deletions
diff --git a/plugins/kv/doc/kv.drawio b/plugins/kv/doc/kv.drawio index 43afcd2e..9c7456ee 100644 --- a/plugins/kv/doc/kv.drawio +++ b/plugins/kv/doc/kv.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-04-21T18:25:55.980Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.114 Electron/12.0.4 Safari/537.36" etag="AVybp6E5fJIGIqD54uYB" version="14.5.1" type="device"><diagram id="2us8W0xnLog_cmX3fgYy" name="Page-1">7VldU6MwFP01fdQBAhQfW4q6M1ad1pnVxwhZiJuSbkg/8NdvKEmBxtY6tWJnfCr35IPknHOTO7QD/MnyisFpMqQRIh3LiJYdMOhYltn1DPFTILlEDM8rkZjhSGIVMMavSHWU6AxHKGt05JQSjqdNMKRpikLewCBjdNHs9oeS5lunMEYaMA4h0dHfOOJJidquV2u4RjhO5KsBMOTKJ1D1lkCWwIguahAIOsBnlPLyabL0ESnoU8SU4y63tK4XwFDK9xkwz4eLkN+mLyF4DqK//vwS/ztz5dp4rnaMIkGADCnjCY1pCklQoX1GZ2mEilkNEVV9biidCtAU4AviPJdqwhmnAkr4hMhWtMT8sfb8VEx17shosJQzr4JcBSln+WM9qI0qwmrYKlLjyv0Vm9pKm4QyOmMh2sGV8h9kMeI7+tlrcUVeIDpBYj1iHEMEcjxvrgNKf8brfpWC4kGK+AFBu6ciaAvCOG0K452KMCeUaaBNQU27TUUrFZ9qLV+TagcwLofeUyzebBmyULDVdS/LBBcYzSlKJ8hRG7qtl3GAlOXb5pDM5BbGD3ej3lWgKdzUb5FgjsZTuKJsIUqgplZyVsQ4Wu5mXmdUDgBOkxlbMbOoqhFTVRhJrRBxje0iHGR7S+NKT4M06hW1l4hCArMMhxsePvoJ8+7JsYX1GqvOG6Qq7EC7m90NUTfFKtNQs7s20bq2VBM5X5w3b2j/c6ntuqz2sOZFm5ca0LK7f3fzMOi3fhA6dtPqTusHobqzfsy/h/ntUzC/rZn/1+3ZMBjejZ6+nf9B+/43NVJ+/L+NLOcU/O9o/hfm93v+dTD4dv5vvxA2T+bb2Sf6+FB/bimITfedivjIheyF5vzRvd+65z3v23lerw9HKMYZRyzT6BIb501OIMFxKp5DQQBiAijowSEkPdkwwVFU5gnK8Ct8Xk1VkD0tlF/txul3nEExl0iNrMyST6K7e/E+3d032LaOxrZ+IJ8zdp5DQajlkoLcZyaeYr4moEb/13+ccK09/WkfjbGuxljrWawfbkdMYxFW/yiVp2P1zxwI/gM=</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-04-21T22:20:03.079Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.114 Electron/12.0.4 Safari/537.36" etag="-ll-rpU3Q5Ucu8te95Qq" version="14.5.1" type="device"><diagram id="2us8W0xnLog_cmX3fgYy" name="Page-1">7Vldc6IwFP01PrYDBJQ+KtJ2Z2rb0c5sfUwhC+lG4ob4QX/9Bkn4MNW641rqTJ/MPSQhOefc5Kod4M3WNwzO4xENEelYRrjugGHHssyea4iPHMkkYrhugUQMhxKrgAl+Q6qjRBc4RGmjI6eUcDxvggFNEhTwBgYZo6tmt1+UNN86hxHSgEkAiY7+xCGPC9TuurUHtwhHsXw1AIZc+Qyq3hJIYxjSVQ0Cfgd4jFJetGZrD5GcPkVMMe56x9NyAQwl/JABy2y0Cvh98hqAFz/87S2v8Z+Lrlwbz9SOUSgIkCFlPKYRTSDxK3TA6CIJUT6rIaKqzx2lcwGaAnxFnGdSTbjgVEAxnxH5FK0xf661p/lUl46Mhms58ybIVJBwlj3Xg9qoPKyGbSI1rthfvqmdtEkopQsWoD1cKf9BFiG+p59diivyAtEZEusR4xgikONlcx1Q+jMq+1UKioYU8R8E7Z2LoC0I47QpjHsuwpxRpoE2BTXtNhWtVJzWnnxOqh3BuBz6SLF4s2XIQsFW170sE7rAaE5ROEGO2tKtXMYRUhZvW0KykFuYPD2M+ze+pnBTv1WMOZrM4YaylSiBmlrJWRHjaL2feZ1ROQA4TWZsxcyqqkZMVWHEtUKka+wW4SjbWxpXehokYT+vvUQUEJimONjy8MlPmA9Pjh2s11h13iFVYUfa3extibotVpGGmt21icraUk3kfHLevKP996W277I6wJpXbV5qQMvuwcPd03DQ+kHo2E2rO60fhOrO+jb/Aea3z8H8tmb+H/cXI3/0MJ5+Of+D9v1vaqR8+38XWc45+N/R/C/M7/W9W3/45fzffiF8pbE1fvRa58l1vxpPpl5TjFGEU45YqtElNs6bnECCo0S0A0EAYgLI6cEBJH35YIbDsDhbUIrf4MtmqpzseV72bnbjDDrOMJ9LHCdpcbL8J7p7Vx/T3XuHbetkbOtJfMnYZQYFoVaX5OS+MNGKeElAjf7P/0LbtQ70p30yxlyNsdazePsHkTKrT5DFIqz+hCi+GVZ/5gD/Lw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 31194af6..98e2bf22 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -20,6 +20,7 @@ const PluginName = "boltdb" // Plugin BoltDB K/V storage. type Plugin struct { + sync.Mutex // db instance DB *bolt.DB // name should be UTF-8 @@ -42,6 +43,10 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) + } + s.log = log s.cfgPlugin = cfg return nil @@ -49,11 +54,14 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { // Serve is noop here func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - return errCh + s.Lock() + defer s.Unlock() + return make(chan error, 1) } func (s *Plugin) Stop() error { + s.Lock() + defer s.Unlock() const op = errors.Op("boltdb_plugin_stop") if s.DB != nil { err := s.Close() @@ -66,12 +74,19 @@ func (s *Plugin) Stop() error { func (s *Plugin) Configure(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_configure") + s.Lock() + defer s.Unlock() err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) if err != nil { return nil, errors.E(op, err) } + s.bucket = []byte(s.cfg.Bucket) + s.stop = make(chan struct{}, 1) + s.timeout = time.Duration(s.cfg.Interval) * time.Second + s.gc = &sync.Map{} + // add default values s.cfg.InitDefaults() @@ -80,6 +95,8 @@ func (s *Plugin) Configure(key string) (kv.Storage, error) { return nil, errors.E(op, err) } + s.DB = db + // create bucket if it does not exist // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { @@ -95,14 +112,8 @@ func (s *Plugin) Configure(key string) (kv.Storage, error) { return nil, errors.E(op, err) } - s.DB = db - s.bucket = []byte(s.cfg.Bucket) - s.stop = make(chan struct{}) - s.timeout = time.Duration(s.cfg.Interval) * time.Second - s.gc = &sync.Map{} - // start the GC phase - go s.gcPhase() + go s.startGCLoop() return s, nil } @@ -397,6 +408,7 @@ func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { func (s *Plugin) Close() error { // stop the keys GC s.stop <- struct{}{} + return s.DB.Close() } @@ -407,48 +419,53 @@ func (s *Plugin) Name() string { // ========================= PRIVATE ================================= -func (s *Plugin) gcPhase() { - t := time.NewTicker(s.timeout) - defer t.Stop() - for { - select { - case <-t.C: - // calculate current time before loop started to be fair - now := time.Now() - s.gc.Range(func(key, value interface{}) bool { - const op = errors.Op("boltdb_plugin_gc") - k := key.(string) - v, err := time.Parse(time.RFC3339, value.(string)) - if err != nil { - return false - } +func (s *Plugin) startGCLoop() { //nolint:gocognit + s.Lock() + defer s.Unlock() + + go func() { + t := time.NewTicker(s.timeout) + defer t.Stop() + for { + select { + case <-t.C: + // calculate current time before loop started to be fair + now := time.Now() + s.gc.Range(func(key, value interface{}) bool { + const op = errors.Op("boltdb_plugin_gc") + k := key.(string) + v, err := time.Parse(time.RFC3339, value.(string)) + if err != nil { + return false + } - if now.After(v) { - // time expired - s.gc.Delete(k) - s.log.Debug("key deleted", "key", k) - err := s.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - err := b.Delete([]byte(k)) + if now.After(v) { + // time expired + s.gc.Delete(k) + s.log.Debug("key deleted", "key", k) + err := s.DB.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + err := b.Delete([]byte(k)) + if err != nil { + return errors.E(op, err) + } + return nil + }) if err != nil { - return errors.E(op, err) + s.log.Error("error during the gc phase of update", "error", err) + // todo this error is ignored, it means, that timer still be active + // to prevent this, we need to invoke t.Stop() + return false } - return nil - }) - if err != nil { - s.log.Error("error during the gc phase of update", "error", err) - // todo this error is ignored, it means, that timer still be active - // to prevent this, we need to invoke t.Stop() - return false } - } - return true - }) - case <-s.stop: - return + return true + }) + case <-s.stop: + return + } } - } + }() } diff --git a/plugins/kv/drivers/boltdb/plugin_unit_test.go b/plugins/kv/drivers/boltdb/plugin_unit_test.go index ad3843e7..50c6c80f 100644 --- a/plugins/kv/drivers/boltdb/plugin_unit_test.go +++ b/plugins/kv/drivers/boltdb/plugin_unit_test.go @@ -62,7 +62,7 @@ func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket } // start the TTL gc - go s.gcPhase() + go s.startGCLoop() return s, nil } diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 496042a6..dff47cb8 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -17,7 +17,8 @@ var EmptyItem = kv.Item{} type Plugin struct { // config - cfg *Config + cfg *Config + cfgPlugin config.Configurer // logger log logger.Logger // memcached client @@ -27,47 +28,39 @@ type Plugin struct { // NewMemcachedClient returns a memcache client using the provided server(s) // with equal weight. If a server is listed multiple times, // it gets a proportional amount of weight. -func NewMemcachedClient(url string) kv.Storage { - m := memcache.New(url) +func NewMemcachedClient(url ...string) kv.Storage { + m := memcache.New(url...) return &Plugin{ client: m, } } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - const op = errors.Op("memcached_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) - if err != nil { - return errors.E(op, err) - } - - s.cfg.InitDefaults() + s.cfgPlugin = cfg s.log = log return nil } -func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - s.client = memcache.New(s.cfg.Addr...) - return errCh -} - -// Stop Memcached has no stop/close or smt similar to close the connection -func (s *Plugin) Stop() error { - return nil -} - // Name returns plugin user-friendly name func (s *Plugin) Name() string { return PluginName } func (s *Plugin) Configure(key string) (kv.Storage, error) { - return s, nil + const op = errors.Op("memcached_plugin_configure") + err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize default keys + s.cfg.InitDefaults() + + return NewMemcachedClient(s.cfg.Addr...), nil } // Has checks the key for existence @@ -123,7 +116,7 @@ func (s *Plugin) Get(key string) ([]byte, error) { return nil, nil } -// return map with key -- string +// MGet return map with key -- string // and map value as value -- []byte func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("memcached_plugin_mget") @@ -201,7 +194,7 @@ func (s *Plugin) Set(items ...kv.Item) error { return nil } -// Expiration is the cache expiration time, in seconds: either a relative +// MExpire Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. func (s *Plugin) MExpire(items ...kv.Item) error { @@ -230,8 +223,8 @@ func (s *Plugin) MExpire(items ...kv.Item) error { return nil } -// return time in seconds (int32) for a given keys -func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { +// TTL return time in seconds (int32) for a given keys +func (s *Plugin) TTL(_ ...string) (map[string]interface{}, error) { const op = errors.Op("memcached_plugin_ttl") return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) } diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go index 73527b97..91fe89d3 100644 --- a/plugins/kv/drivers/memory/plugin.go +++ b/plugins/kv/drivers/memory/plugin.go @@ -19,32 +19,25 @@ type Plugin struct { heap sync.Map stop chan struct{} - log logger.Logger - cfg *Config + log logger.Logger + cfg *Config + cfgPlugin config.Configurer } func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("in_memory_plugin_init") - if !cfg.Has(PluginName) { + if !cfg.Has(kv.PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) - if err != nil { - return errors.E(op, err) - } - s.cfg.InitDefaults() s.log = log - + s.cfgPlugin = cfg s.stop = make(chan struct{}, 1) return nil } func (s *Plugin) Serve() chan error { errCh := make(chan error, 1) - // start in-memory gc for kv - go s.gc() - return errCh } @@ -58,6 +51,18 @@ func (s *Plugin) Stop() error { } func (s *Plugin) Configure(key string) (kv.Storage, error) { + const op = errors.Op("inmemory_plugin_configure") + err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize default keys + s.cfg.InitDefaults() + + // start in-memory gc for kv + go s.gc() + return s, nil } @@ -232,7 +237,6 @@ func (s *Plugin) Name() string { // ================================== PRIVATE ====================================== func (s *Plugin) gc() { - // TODO check ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) for { select { diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 69b91981..cc3875e0 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -49,13 +49,13 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error { return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -//Set accept []byte flatbuffers payload with Storage and Item +// Set accept []byte flatbuffers payload with Storage and Item func (r *rpc) Set(in []byte, ok *bool) error { const op = errors.Op("rpc_set") dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() + items := make([]Item, 0, dataRoot.ItemsLength()) tmpItem := &generated.Item{} @@ -84,70 +84,155 @@ func (r *rpc) Set(in []byte, ok *bool) error { return nil } + *ok = false return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } // MGet accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) MGet(in []string, res *map[string]interface{}) error { -// const op = errors.Op("rpc server MGet") -// ret, err := r.svc.MGet(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// // update return value -// *res = ret -// return nil -//} - -//// MExpire accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) MExpire(in []Item, ok *bool) error { -// const op = errors.Op("rpc server MExpire") -// -// err := r.svc.MExpire(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// *ok = true -// return nil -//} -// -//// TTL accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) TTL(in []string, res *map[string]interface{}) error { -// const op = errors.Op("rpc server TTL") -// -// ret, err := r.svc.TTL(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// *res = ret -// return nil -//} -// -//// Delete accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) Delete(in []string, ok *bool) error { -// const op = errors.Op("rpc server Delete") -// err := r.svc.Delete(in...) -// if err != nil { -// return errors.E(op, err) -// } -// *ok = true -// return nil -//} -// -//// Close closes the storage connection -//func (r *rpc) Close(storage string, ok *bool) error { -// const op = errors.Op("rpc server Close") -// err := r.svc.Close() -// if err != nil { -// return errors.E(op, err) -// } -// *ok = true -// -// return nil -//} +func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_mget") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.MGet(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// MExpire accept []byte flatbuffers payload with Storage and Item +func (r *rpc) MExpire(in []byte, ok *bool) error { + const op = errors.Op("rpc_mexpire") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + + // when unmarshalling the keys, simultaneously, fill up the slice with items + items := make([]Item, 0, l) + tmpItem := &generated.Item{} + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + + itc := Item{ + Key: string(tmpItem.Key()), + // we set up timeout on the keys, so, value here is redundant + Value: "", + TTL: string(tmpItem.Timeout()), + } + + items = append(items, itc) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.MExpire(items...) + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// TTL accept []byte flatbuffers payload with Storage and Item +func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_ttl") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.TTL(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// Delete accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Delete(in []byte, ok *bool) error { + const op = errors.Op("rcp_delete") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.Delete(keys...) + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// Close closes the storage connection +func (r *rpc) Close(storage string, ok *bool) error { + const op = errors.Op("rpc_close") + if st, exists := r.storages[storage]; exists { + err := st.Close() + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return nil +} func strConvert(s []byte) string { return *(*string)(unsafe.Pointer(&s)) diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go index 81ca2d91..b779665b 100644 --- a/plugins/kv/storage.go +++ b/plugins/kv/storage.go @@ -105,7 +105,6 @@ func (p *Plugin) Serve() chan error { // save the storage p.storages[k] = storage - case boltdb: if _, ok := p.drivers[boltdb]; !ok { continue |