diff options
Diffstat (limited to 'plugins/kv/drivers/boltdb/plugin.go')
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin.go | 113 |
1 files changed, 65 insertions, 48 deletions
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 31194af6..98e2bf22 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -20,6 +20,7 @@ const PluginName = "boltdb" // Plugin BoltDB K/V storage. type Plugin struct { + sync.Mutex // db instance DB *bolt.DB // name should be UTF-8 @@ -42,6 +43,10 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) + } + s.log = log s.cfgPlugin = cfg return nil @@ -49,11 +54,14 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { // Serve is noop here func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - return errCh + s.Lock() + defer s.Unlock() + return make(chan error, 1) } func (s *Plugin) Stop() error { + s.Lock() + defer s.Unlock() const op = errors.Op("boltdb_plugin_stop") if s.DB != nil { err := s.Close() @@ -66,12 +74,19 @@ func (s *Plugin) Stop() error { func (s *Plugin) Configure(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_configure") + s.Lock() + defer s.Unlock() err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) if err != nil { return nil, errors.E(op, err) } + s.bucket = []byte(s.cfg.Bucket) + s.stop = make(chan struct{}, 1) + s.timeout = time.Duration(s.cfg.Interval) * time.Second + s.gc = &sync.Map{} + // add default values s.cfg.InitDefaults() @@ -80,6 +95,8 @@ func (s *Plugin) Configure(key string) (kv.Storage, error) { return nil, errors.E(op, err) } + s.DB = db + // create bucket if it does not exist // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { @@ -95,14 +112,8 @@ func (s *Plugin) Configure(key string) (kv.Storage, error) { return nil, errors.E(op, err) } - s.DB = db - s.bucket = []byte(s.cfg.Bucket) - s.stop = make(chan struct{}) - s.timeout = time.Duration(s.cfg.Interval) * time.Second - s.gc = &sync.Map{} - // start the GC phase - go s.gcPhase() + go s.startGCLoop() return s, nil } @@ -397,6 +408,7 @@ func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { func (s *Plugin) Close() error { // stop the keys GC s.stop <- struct{}{} + return s.DB.Close() } @@ -407,48 +419,53 @@ func (s *Plugin) Name() string { // ========================= PRIVATE ================================= -func (s *Plugin) gcPhase() { - t := time.NewTicker(s.timeout) - defer t.Stop() - for { - select { - case <-t.C: - // calculate current time before loop started to be fair - now := time.Now() - s.gc.Range(func(key, value interface{}) bool { - const op = errors.Op("boltdb_plugin_gc") - k := key.(string) - v, err := time.Parse(time.RFC3339, value.(string)) - if err != nil { - return false - } +func (s *Plugin) startGCLoop() { //nolint:gocognit + s.Lock() + defer s.Unlock() + + go func() { + t := time.NewTicker(s.timeout) + defer t.Stop() + for { + select { + case <-t.C: + // calculate current time before loop started to be fair + now := time.Now() + s.gc.Range(func(key, value interface{}) bool { + const op = errors.Op("boltdb_plugin_gc") + k := key.(string) + v, err := time.Parse(time.RFC3339, value.(string)) + if err != nil { + return false + } - if now.After(v) { - // time expired - s.gc.Delete(k) - s.log.Debug("key deleted", "key", k) - err := s.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - err := b.Delete([]byte(k)) + if now.After(v) { + // time expired + s.gc.Delete(k) + s.log.Debug("key deleted", "key", k) + err := s.DB.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + err := b.Delete([]byte(k)) + if err != nil { + return errors.E(op, err) + } + return nil + }) if err != nil { - return errors.E(op, err) + s.log.Error("error during the gc phase of update", "error", err) + // todo this error is ignored, it means, that timer still be active + // to prevent this, we need to invoke t.Stop() + return false } - return nil - }) - if err != nil { - s.log.Error("error during the gc phase of update", "error", err) - // todo this error is ignored, it means, that timer still be active - // to prevent this, we need to invoke t.Stop() - return false } - } - return true - }) - case <-s.stop: - return + return true + }) + case <-s.stop: + return + } } - } + }() } |