diff options
author | Valery Piashchynski <[email protected]> | 2021-04-22 14:41:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-04-22 14:41:50 +0300 |
commit | abf606afd6fd9fbb0fd374ab5da41a8ee8d5a15d (patch) | |
tree | b0cfdeb5c478463d853b8a7409e88cfa7219c3fa /plugins/kv/drivers | |
parent | e4d65a41ec90747a387cfe769f743327959f7105 (diff) |
- Implement tests for the KV
- Implement Storage interface for the boltdb,memory,memcached drivers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/kv/drivers')
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin.go | 113 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin_unit_test.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin.go | 49 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/plugin.go | 30 |
4 files changed, 104 insertions, 90 deletions
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 31194af6..98e2bf22 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -20,6 +20,7 @@ const PluginName = "boltdb" // Plugin BoltDB K/V storage. type Plugin struct { + sync.Mutex // db instance DB *bolt.DB // name should be UTF-8 @@ -42,6 +43,10 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) + } + s.log = log s.cfgPlugin = cfg return nil @@ -49,11 +54,14 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { // Serve is noop here func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - return errCh + s.Lock() + defer s.Unlock() + return make(chan error, 1) } func (s *Plugin) Stop() error { + s.Lock() + defer s.Unlock() const op = errors.Op("boltdb_plugin_stop") if s.DB != nil { err := s.Close() @@ -66,12 +74,19 @@ func (s *Plugin) Stop() error { func (s *Plugin) Configure(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_configure") + s.Lock() + defer s.Unlock() err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) if err != nil { return nil, errors.E(op, err) } + s.bucket = []byte(s.cfg.Bucket) + s.stop = make(chan struct{}, 1) + s.timeout = time.Duration(s.cfg.Interval) * time.Second + s.gc = &sync.Map{} + // add default values s.cfg.InitDefaults() @@ -80,6 +95,8 @@ func (s *Plugin) Configure(key string) (kv.Storage, error) { return nil, errors.E(op, err) } + s.DB = db + // create bucket if it does not exist // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { @@ -95,14 +112,8 @@ func (s *Plugin) Configure(key string) (kv.Storage, error) { return nil, errors.E(op, err) } - s.DB = db - s.bucket = []byte(s.cfg.Bucket) - s.stop = make(chan struct{}) - s.timeout = time.Duration(s.cfg.Interval) * time.Second - s.gc = &sync.Map{} - // start the GC phase - go s.gcPhase() + go s.startGCLoop() return s, nil } @@ -397,6 +408,7 @@ func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { func (s *Plugin) Close() error { // stop the keys GC s.stop <- struct{}{} + return s.DB.Close() } @@ -407,48 +419,53 @@ func (s *Plugin) Name() string { // ========================= PRIVATE ================================= -func (s *Plugin) gcPhase() { - t := time.NewTicker(s.timeout) - defer t.Stop() - for { - select { - case <-t.C: - // calculate current time before loop started to be fair - now := time.Now() - s.gc.Range(func(key, value interface{}) bool { - const op = errors.Op("boltdb_plugin_gc") - k := key.(string) - v, err := time.Parse(time.RFC3339, value.(string)) - if err != nil { - return false - } +func (s *Plugin) startGCLoop() { //nolint:gocognit + s.Lock() + defer s.Unlock() + + go func() { + t := time.NewTicker(s.timeout) + defer t.Stop() + for { + select { + case <-t.C: + // calculate current time before loop started to be fair + now := time.Now() + s.gc.Range(func(key, value interface{}) bool { + const op = errors.Op("boltdb_plugin_gc") + k := key.(string) + v, err := time.Parse(time.RFC3339, value.(string)) + if err != nil { + return false + } - if now.After(v) { - // time expired - s.gc.Delete(k) - s.log.Debug("key deleted", "key", k) - err := s.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - err := b.Delete([]byte(k)) + if now.After(v) { + // time expired + s.gc.Delete(k) + s.log.Debug("key deleted", "key", k) + err := s.DB.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + err := b.Delete([]byte(k)) + if err != nil { + return errors.E(op, err) + } + return nil + }) if err != nil { - return errors.E(op, err) + s.log.Error("error during the gc phase of update", "error", err) + // todo this error is ignored, it means, that timer still be active + // to prevent this, we need to invoke t.Stop() + return false } - return nil - }) - if err != nil { - s.log.Error("error during the gc phase of update", "error", err) - // todo this error is ignored, it means, that timer still be active - // to prevent this, we need to invoke t.Stop() - return false } - } - return true - }) - case <-s.stop: - return + return true + }) + case <-s.stop: + return + } } - } + }() } diff --git a/plugins/kv/drivers/boltdb/plugin_unit_test.go b/plugins/kv/drivers/boltdb/plugin_unit_test.go index ad3843e7..50c6c80f 100644 --- a/plugins/kv/drivers/boltdb/plugin_unit_test.go +++ b/plugins/kv/drivers/boltdb/plugin_unit_test.go @@ -62,7 +62,7 @@ func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket } // start the TTL gc - go s.gcPhase() + go s.startGCLoop() return s, nil } diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 496042a6..dff47cb8 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -17,7 +17,8 @@ var EmptyItem = kv.Item{} type Plugin struct { // config - cfg *Config + cfg *Config + cfgPlugin config.Configurer // logger log logger.Logger // memcached client @@ -27,47 +28,39 @@ type Plugin struct { // NewMemcachedClient returns a memcache client using the provided server(s) // with equal weight. If a server is listed multiple times, // it gets a proportional amount of weight. -func NewMemcachedClient(url string) kv.Storage { - m := memcache.New(url) +func NewMemcachedClient(url ...string) kv.Storage { + m := memcache.New(url...) return &Plugin{ client: m, } } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - const op = errors.Op("memcached_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) - if err != nil { - return errors.E(op, err) - } - - s.cfg.InitDefaults() + s.cfgPlugin = cfg s.log = log return nil } -func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - s.client = memcache.New(s.cfg.Addr...) - return errCh -} - -// Stop Memcached has no stop/close or smt similar to close the connection -func (s *Plugin) Stop() error { - return nil -} - // Name returns plugin user-friendly name func (s *Plugin) Name() string { return PluginName } func (s *Plugin) Configure(key string) (kv.Storage, error) { - return s, nil + const op = errors.Op("memcached_plugin_configure") + err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize default keys + s.cfg.InitDefaults() + + return NewMemcachedClient(s.cfg.Addr...), nil } // Has checks the key for existence @@ -123,7 +116,7 @@ func (s *Plugin) Get(key string) ([]byte, error) { return nil, nil } -// return map with key -- string +// MGet return map with key -- string // and map value as value -- []byte func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("memcached_plugin_mget") @@ -201,7 +194,7 @@ func (s *Plugin) Set(items ...kv.Item) error { return nil } -// Expiration is the cache expiration time, in seconds: either a relative +// MExpire Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. func (s *Plugin) MExpire(items ...kv.Item) error { @@ -230,8 +223,8 @@ func (s *Plugin) MExpire(items ...kv.Item) error { return nil } -// return time in seconds (int32) for a given keys -func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { +// TTL return time in seconds (int32) for a given keys +func (s *Plugin) TTL(_ ...string) (map[string]interface{}, error) { const op = errors.Op("memcached_plugin_ttl") return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) } diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go index 73527b97..91fe89d3 100644 --- a/plugins/kv/drivers/memory/plugin.go +++ b/plugins/kv/drivers/memory/plugin.go @@ -19,32 +19,25 @@ type Plugin struct { heap sync.Map stop chan struct{} - log logger.Logger - cfg *Config + log logger.Logger + cfg *Config + cfgPlugin config.Configurer } func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("in_memory_plugin_init") - if !cfg.Has(PluginName) { + if !cfg.Has(kv.PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) - if err != nil { - return errors.E(op, err) - } - s.cfg.InitDefaults() s.log = log - + s.cfgPlugin = cfg s.stop = make(chan struct{}, 1) return nil } func (s *Plugin) Serve() chan error { errCh := make(chan error, 1) - // start in-memory gc for kv - go s.gc() - return errCh } @@ -58,6 +51,18 @@ func (s *Plugin) Stop() error { } func (s *Plugin) Configure(key string) (kv.Storage, error) { + const op = errors.Op("inmemory_plugin_configure") + err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize default keys + s.cfg.InitDefaults() + + // start in-memory gc for kv + go s.gc() + return s, nil } @@ -232,7 +237,6 @@ func (s *Plugin) Name() string { // ================================== PRIVATE ====================================== func (s *Plugin) gc() { - // TODO check ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) for { select { |