diff options
Diffstat (limited to 'plugins/kv/boltdb/plugin.go')
-rw-r--r-- | plugins/kv/boltdb/plugin.go | 148 |
1 files changed, 62 insertions, 86 deletions
diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/boltdb/plugin.go index e5eda0c2..6cfc49f6 100644 --- a/plugins/kv/boltdb/plugin.go +++ b/plugins/kv/boltdb/plugin.go @@ -2,7 +2,6 @@ package boltdb import ( "bytes" - "context" "encoding/gob" "os" "path" @@ -41,78 +40,23 @@ type Plugin struct { stop chan struct{} } -// NewBoltClient instantiate new BOLTDB client -// The parameters are: -// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created -// perm os.FileMode -- file permissions, for example 0777 -// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other -// bucket string -- name of the bucket to use, should be UTF-8 -func NewBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) { - const op = errors.Op("newBoltClient") - db, err := bolt.Open(path, perm, options) - if err != nil { - return nil, errors.E(op, err) - } - - // bucket should be SET - if bucket == "" { - return nil, errors.E(op, errors.Str("bucket should be set")) - } - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - _, err = tx.CreateBucketIfNotExists([]byte(bucket)) - if err != nil { - return errors.E(op, err) - } - return nil - }) - if err != nil { - return nil, errors.E(op, err) - } - - // if TTL is not set, make it default - if ttl == 0 { - ttl = time.Minute - } - - s := &Plugin{ - DB: db, - bucket: []byte(bucket), - stop: make(chan struct{}), - timeout: ttl, - gc: &sync.Map{}, - } - - // start the TTL gc - go s.gcPhase() - - return s, nil -} - func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { const op = errors.Op("boltdb plugin init") s.cfg = &Config{} + s.cfg.InitDefaults() + err := cfg.UnmarshalKey(PluginName, &s.cfg) if err != nil { return errors.E(op, errors.Disabled, err) } + // set the logger s.log = log - return nil -} - -func (s *Plugin) Serve() chan error { - const op = errors.Op("boltdb serve") - errCh := make(chan error, 1) - db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil) if err != nil { - errCh <- errors.E(op, err) - return errCh + return errors.E(op, err) } // create bucket if it does not exist @@ -125,24 +69,29 @@ func (s *Plugin) Serve() chan error { } return nil }) + if err != nil { - errCh <- err - return errCh + return errors.E(op, err) } s.DB = db s.bucket = []byte(s.cfg.Bucket) s.stop = make(chan struct{}) - s.timeout = time.Duration(s.cfg.TTL) * time.Second + s.timeout = time.Duration(s.cfg.Interval) * time.Second s.gc = &sync.Map{} + return nil +} + +func (s *Plugin) Serve() chan error { + errCh := make(chan error, 1) // start the TTL gc go s.gcPhase() return errCh } -func (s Plugin) Stop() error { +func (s *Plugin) Stop() error { const op = errors.Op("boltdb stop") err := s.Close() if err != nil { @@ -151,8 +100,9 @@ func (s Plugin) Stop() error { return nil } -func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) { +func (s *Plugin) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("boltdb Has") + s.log.Debug("boltdb HAS method called", "args", keys) if keys == nil { return nil, errors.E(op, errors.NoKeys) } @@ -164,8 +114,8 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error // Get retrieves the value for a key in the bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket. // The returned value is only valid for the life of the transaction. - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) if keyTrimmed == "" { return errors.E(op, errors.EmptyKey) } @@ -173,24 +123,25 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error if b == nil { return errors.E(op, errors.NoSuchBucket) } - exist := b.Get([]byte(key)) + exist := b.Get([]byte(keys[i])) if exist != nil { - m[key] = true + m[keys[i]] = true } } return nil }) if err != nil { - return nil, err + return nil, errors.E(op, err) } + s.log.Debug("boltdb HAS method finished") return m, nil } // Get retrieves the value for a key in the bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket. // The returned value is only valid for the life of the transaction. -func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { +func (s *Plugin) Get(key string) ([]byte, error) { const op = errors.Op("boltdb Get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -211,7 +162,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { buf := bytes.NewReader(val) decoder := gob.NewDecoder(buf) - i := kv.Item{} + var i string err := decoder.Decode(&i) if err != nil { // unsafe (w/o runes) convert @@ -219,7 +170,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { } // set the value - val = []byte(i.Value) + val = []byte(i) } return nil }) @@ -230,7 +181,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { return val, nil } -func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("boltdb MGet") // defence if keys == nil { @@ -238,8 +189,8 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ } // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) if keyTrimmed == "" { return nil, errors.E(op, errors.EmptyKey) } @@ -253,10 +204,22 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ return errors.E(op, errors.NoSuchBucket) } - for _, key := range keys { - value := b.Get([]byte(key)) + buf := new(bytes.Buffer) + var out string + buf.Grow(100) + for i := range keys { + value := b.Get([]byte(keys[i])) + buf.Write(value) + // allocate enough space + dec := gob.NewDecoder(buf) if value != nil { - m[key] = value + err := dec.Decode(&out) + if err != nil { + return errors.E(op, err) + } + m[keys[i]] = out + buf.Reset() + out = "" } } @@ -270,7 +233,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ } // Set puts the K/V to the bolt -func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { +func (s *Plugin) Set(items ...kv.Item) error { const op = errors.Op("boltdb Set") if items == nil { return errors.E(op, errors.NoKeys) @@ -303,7 +266,8 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { return errors.E(op, errors.EmptyItem) } - err = encoder.Encode(&items[i]) + // Encode value + err = encoder.Encode(&items[i].Value) if err != nil { return errors.E(op, err) } @@ -321,6 +285,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { if err != nil { return errors.E(op, err) } + // Store key TTL in the separate map s.gc.Store(items[i].Key, items[i].TTL) } @@ -331,7 +296,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { } // Delete all keys from DB -func (s Plugin) Delete(ctx context.Context, keys ...string) error { +func (s *Plugin) Delete(keys ...string) error { const op = errors.Op("boltdb Delete") if keys == nil { return errors.E(op, errors.NoKeys) @@ -378,7 +343,7 @@ func (s Plugin) Delete(ctx context.Context, keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { +func (s *Plugin) MExpire(items ...kv.Item) error { const op = errors.Op("boltdb MExpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { @@ -396,7 +361,7 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { return nil } -func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { const op = errors.Op("boltdb TTL") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -422,15 +387,25 @@ func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{} } // Close the DB connection -func (s Plugin) Close() error { +func (s *Plugin) Close() error { // stop the keys GC s.stop <- struct{}{} return s.DB.Close() } +// RPCService returns associated rpc service. +func (s *Plugin) RPC() interface{} { + return kv.NewRPCServer(s, s.log) +} + +// Name returns plugin name +func (s *Plugin) Name() string { + return PluginName +} + // ========================= PRIVATE ================================= -func (s Plugin) gcPhase() { +func (s *Plugin) gcPhase() { t := time.NewTicker(s.timeout) defer t.Stop() for { @@ -449,6 +424,7 @@ func (s Plugin) gcPhase() { if now.After(v) { // time expired s.gc.Delete(k) + s.log.Debug("key deleted", "key", k) err := s.DB.Update(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucket) if b == nil { |