summaryrefslogtreecommitdiff
path: root/plugins/kv/boltdb/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv/boltdb/plugin.go')
-rw-r--r--plugins/kv/boltdb/plugin.go148
1 files changed, 62 insertions, 86 deletions
diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/boltdb/plugin.go
index e5eda0c2..6cfc49f6 100644
--- a/plugins/kv/boltdb/plugin.go
+++ b/plugins/kv/boltdb/plugin.go
@@ -2,7 +2,6 @@ package boltdb
import (
"bytes"
- "context"
"encoding/gob"
"os"
"path"
@@ -41,78 +40,23 @@ type Plugin struct {
stop chan struct{}
}
-// NewBoltClient instantiate new BOLTDB client
-// The parameters are:
-// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created
-// perm os.FileMode -- file permissions, for example 0777
-// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other
-// bucket string -- name of the bucket to use, should be UTF-8
-func NewBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) {
- const op = errors.Op("newBoltClient")
- db, err := bolt.Open(path, perm, options)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // bucket should be SET
- if bucket == "" {
- return nil, errors.E(op, errors.Str("bucket should be set"))
- }
-
- // create bucket if it does not exist
- // tx.Commit invokes via the db.Update
- err = db.Update(func(tx *bolt.Tx) error {
- _, err = tx.CreateBucketIfNotExists([]byte(bucket))
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- })
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // if TTL is not set, make it default
- if ttl == 0 {
- ttl = time.Minute
- }
-
- s := &Plugin{
- DB: db,
- bucket: []byte(bucket),
- stop: make(chan struct{}),
- timeout: ttl,
- gc: &sync.Map{},
- }
-
- // start the TTL gc
- go s.gcPhase()
-
- return s, nil
-}
-
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
const op = errors.Op("boltdb plugin init")
s.cfg = &Config{}
+ s.cfg.InitDefaults()
+
err := cfg.UnmarshalKey(PluginName, &s.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
+ // set the logger
s.log = log
- return nil
-}
-
-func (s *Plugin) Serve() chan error {
- const op = errors.Op("boltdb serve")
- errCh := make(chan error, 1)
-
db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil)
if err != nil {
- errCh <- errors.E(op, err)
- return errCh
+ return errors.E(op, err)
}
// create bucket if it does not exist
@@ -125,24 +69,29 @@ func (s *Plugin) Serve() chan error {
}
return nil
})
+
if err != nil {
- errCh <- err
- return errCh
+ return errors.E(op, err)
}
s.DB = db
s.bucket = []byte(s.cfg.Bucket)
s.stop = make(chan struct{})
- s.timeout = time.Duration(s.cfg.TTL) * time.Second
+ s.timeout = time.Duration(s.cfg.Interval) * time.Second
s.gc = &sync.Map{}
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
// start the TTL gc
go s.gcPhase()
return errCh
}
-func (s Plugin) Stop() error {
+func (s *Plugin) Stop() error {
const op = errors.Op("boltdb stop")
err := s.Close()
if err != nil {
@@ -151,8 +100,9 @@ func (s Plugin) Stop() error {
return nil
}
-func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("boltdb Has")
+ s.log.Debug("boltdb HAS method called", "args", keys)
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
}
@@ -164,8 +114,8 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
if keyTrimmed == "" {
return errors.E(op, errors.EmptyKey)
}
@@ -173,24 +123,25 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
- exist := b.Get([]byte(key))
+ exist := b.Get([]byte(keys[i]))
if exist != nil {
- m[key] = true
+ m[keys[i]] = true
}
}
return nil
})
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
+ s.log.Debug("boltdb HAS method finished")
return m, nil
}
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
-func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+func (s *Plugin) Get(key string) ([]byte, error) {
const op = errors.Op("boltdb Get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -211,7 +162,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
buf := bytes.NewReader(val)
decoder := gob.NewDecoder(buf)
- i := kv.Item{}
+ var i string
err := decoder.Decode(&i)
if err != nil {
// unsafe (w/o runes) convert
@@ -219,7 +170,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
}
// set the value
- val = []byte(i.Value)
+ val = []byte(i)
}
return nil
})
@@ -230,7 +181,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
return val, nil
}
-func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("boltdb MGet")
// defence
if keys == nil {
@@ -238,8 +189,8 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
}
// should not be empty keys
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
if keyTrimmed == "" {
return nil, errors.E(op, errors.EmptyKey)
}
@@ -253,10 +204,22 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
return errors.E(op, errors.NoSuchBucket)
}
- for _, key := range keys {
- value := b.Get([]byte(key))
+ buf := new(bytes.Buffer)
+ var out string
+ buf.Grow(100)
+ for i := range keys {
+ value := b.Get([]byte(keys[i]))
+ buf.Write(value)
+ // allocate enough space
+ dec := gob.NewDecoder(buf)
if value != nil {
- m[key] = value
+ err := dec.Decode(&out)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ m[keys[i]] = out
+ buf.Reset()
+ out = ""
}
}
@@ -270,7 +233,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
}
// Set puts the K/V to the bolt
-func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+func (s *Plugin) Set(items ...kv.Item) error {
const op = errors.Op("boltdb Set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -303,7 +266,8 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
return errors.E(op, errors.EmptyItem)
}
- err = encoder.Encode(&items[i])
+ // Encode value
+ err = encoder.Encode(&items[i].Value)
if err != nil {
return errors.E(op, err)
}
@@ -321,6 +285,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
if err != nil {
return errors.E(op, err)
}
+ // Store key TTL in the separate map
s.gc.Store(items[i].Key, items[i].TTL)
}
@@ -331,7 +296,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
}
// Delete all keys from DB
-func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+func (s *Plugin) Delete(keys ...string) error {
const op = errors.Op("boltdb Delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -378,7 +343,7 @@ func (s Plugin) Delete(ctx context.Context, keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+func (s *Plugin) MExpire(items ...kv.Item) error {
const op = errors.Op("boltdb MExpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
@@ -396,7 +361,7 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
return nil
}
-func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("boltdb TTL")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -422,15 +387,25 @@ func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}
}
// Close the DB connection
-func (s Plugin) Close() error {
+func (s *Plugin) Close() error {
// stop the keys GC
s.stop <- struct{}{}
return s.DB.Close()
}
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
// ========================= PRIVATE =================================
-func (s Plugin) gcPhase() {
+func (s *Plugin) gcPhase() {
t := time.NewTicker(s.timeout)
defer t.Stop()
for {
@@ -449,6 +424,7 @@ func (s Plugin) gcPhase() {
if now.After(v) {
// time expired
s.gc.Delete(k)
+ s.log.Debug("key deleted", "key", k)
err := s.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
if b == nil {