diff options
-rw-r--r-- | .github/workflows/linux.yml | 3 | ||||
-rwxr-xr-x | Makefile | 3 | ||||
-rw-r--r-- | plugins/kv/doc/kv.drawio | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin.go | 113 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin_unit_test.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin.go | 49 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/plugin.go | 30 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 209 | ||||
-rw-r--r-- | plugins/kv/storage.go | 1 | ||||
-rw-r--r-- | tests/plugins/kv/boltdb/configs/.rr-init.yaml | 45 | ||||
-rw-r--r-- | tests/plugins/kv/boltdb/plugin_test.go | 195 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-boltdb.yaml | 15 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-in-memory.yaml | 11 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-kv-init.yaml | 4 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-memcached.yaml | 12 | ||||
-rw-r--r-- | tests/plugins/kv/memcached/configs/.rr-init.yaml | 42 | ||||
-rw-r--r-- | tests/plugins/kv/memcached/plugin_test.go | 195 | ||||
-rw-r--r-- | tests/plugins/kv/memory/configs/.rr-init.yaml | 42 | ||||
-rw-r--r-- | tests/plugins/kv/memory/plugin_test.go | 195 | ||||
-rw-r--r-- | tests/plugins/kv/storage_plugin_test.go | 606 |
20 files changed, 896 insertions, 878 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 4c3e370f..c0e90ec4 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -80,9 +80,6 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/boltdb_unit.txt -covermode=atomic ./plugins/kv/drivers/boltdb go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_unit.txt -covermode=atomic ./plugins/kv/drivers/memory go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/memcached_unit.txt -covermode=atomic ./plugins/kv/drivers/memcached - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/boltdb.txt -covermode=atomic ./tests/plugins/kv/boltdb - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/memory.txt -covermode=atomic ./tests/plugins/kv/memory - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/memcached.txt -covermode=atomic ./tests/plugins/kv/memcached go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv docker-compose -f ./tests/docker-compose.yaml down cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt @@ -84,8 +84,5 @@ test: ## Run application tests go test -v -race -tags=debug ./plugins/kv/drivers/boltdb go test -v -race -tags=debug ./plugins/kv/drivers/memory go test -v -race -tags=debug ./plugins/kv/drivers/memcached - go test -v -race -tags=debug ./tests/plugins/kv/boltdb - go test -v -race -tags=debug ./tests/plugins/kv/memory - go test -v -race -tags=debug ./tests/plugins/kv/memcached go test -v -race -tags=debug ./tests/plugins/kv docker-compose -f tests/docker-compose.yaml down diff --git a/plugins/kv/doc/kv.drawio b/plugins/kv/doc/kv.drawio index 43afcd2e..9c7456ee 100644 --- a/plugins/kv/doc/kv.drawio +++ b/plugins/kv/doc/kv.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-04-21T18:25:55.980Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.114 Electron/12.0.4 Safari/537.36" etag="AVybp6E5fJIGIqD54uYB" version="14.5.1" type="device"><diagram id="2us8W0xnLog_cmX3fgYy" name="Page-1">7VldU6MwFP01fdQBAhQfW4q6M1ad1pnVxwhZiJuSbkg/8NdvKEmBxtY6tWJnfCr35IPknHOTO7QD/MnyisFpMqQRIh3LiJYdMOhYltn1DPFTILlEDM8rkZjhSGIVMMavSHWU6AxHKGt05JQSjqdNMKRpikLewCBjdNHs9oeS5lunMEYaMA4h0dHfOOJJidquV2u4RjhO5KsBMOTKJ1D1lkCWwIguahAIOsBnlPLyabL0ESnoU8SU4y63tK4XwFDK9xkwz4eLkN+mLyF4DqK//vwS/ztz5dp4rnaMIkGADCnjCY1pCklQoX1GZ2mEilkNEVV9biidCtAU4AviPJdqwhmnAkr4hMhWtMT8sfb8VEx17shosJQzr4JcBSln+WM9qI0qwmrYKlLjyv0Vm9pKm4QyOmMh2sGV8h9kMeI7+tlrcUVeIDpBYj1iHEMEcjxvrgNKf8brfpWC4kGK+AFBu6ciaAvCOG0K452KMCeUaaBNQU27TUUrFZ9qLV+TagcwLofeUyzebBmyULDVdS/LBBcYzSlKJ8hRG7qtl3GAlOXb5pDM5BbGD3ej3lWgKdzUb5FgjsZTuKJsIUqgplZyVsQ4Wu5mXmdUDgBOkxlbMbOoqhFTVRhJrRBxje0iHGR7S+NKT4M06hW1l4hCArMMhxsePvoJ8+7JsYX1GqvOG6Qq7EC7m90NUTfFKtNQs7s20bq2VBM5X5w3b2j/c6ntuqz2sOZFm5ca0LK7f3fzMOi3fhA6dtPqTusHobqzfsy/h/ntUzC/rZn/1+3ZMBjejZ6+nf9B+/43NVJ+/L+NLOcU/O9o/hfm93v+dTD4dv5vvxA2T+bb2Sf6+FB/bimITfedivjIheyF5vzRvd+65z3v23lerw9HKMYZRyzT6BIb501OIMFxKp5DQQBiAijowSEkPdkwwVFU5gnK8Ct8Xk1VkD0tlF/txul3nEExl0iNrMyST6K7e/E+3d032LaOxrZ+IJ8zdp5DQajlkoLcZyaeYr4moEb/13+ccK09/WkfjbGuxljrWawfbkdMYxFW/yiVp2P1zxwI/gM=</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-04-21T22:20:03.079Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.114 Electron/12.0.4 Safari/537.36" etag="-ll-rpU3Q5Ucu8te95Qq" version="14.5.1" type="device"><diagram id="2us8W0xnLog_cmX3fgYy" name="Page-1">7Vldc6IwFP01PrYDBJQ+KtJ2Z2rb0c5sfUwhC+lG4ob4QX/9Bkn4MNW641rqTJ/MPSQhOefc5Kod4M3WNwzO4xENEelYRrjugGHHssyea4iPHMkkYrhugUQMhxKrgAl+Q6qjRBc4RGmjI6eUcDxvggFNEhTwBgYZo6tmt1+UNN86hxHSgEkAiY7+xCGPC9TuurUHtwhHsXw1AIZc+Qyq3hJIYxjSVQ0Cfgd4jFJetGZrD5GcPkVMMe56x9NyAQwl/JABy2y0Cvh98hqAFz/87S2v8Z+Lrlwbz9SOUSgIkCFlPKYRTSDxK3TA6CIJUT6rIaKqzx2lcwGaAnxFnGdSTbjgVEAxnxH5FK0xf661p/lUl46Mhms58ybIVJBwlj3Xg9qoPKyGbSI1rthfvqmdtEkopQsWoD1cKf9BFiG+p59diivyAtEZEusR4xgikONlcx1Q+jMq+1UKioYU8R8E7Z2LoC0I47QpjHsuwpxRpoE2BTXtNhWtVJzWnnxOqh3BuBz6SLF4s2XIQsFW170sE7rAaE5ROEGO2tKtXMYRUhZvW0KykFuYPD2M+ze+pnBTv1WMOZrM4YaylSiBmlrJWRHjaL2feZ1ROQA4TWZsxcyqqkZMVWHEtUKka+wW4SjbWxpXehokYT+vvUQUEJimONjy8MlPmA9Pjh2s11h13iFVYUfa3extibotVpGGmt21icraUk3kfHLevKP996W277I6wJpXbV5qQMvuwcPd03DQ+kHo2E2rO60fhOrO+jb/Aea3z8H8tmb+H/cXI3/0MJ5+Of+D9v1vaqR8+38XWc45+N/R/C/M7/W9W3/45fzffiF8pbE1fvRa58l1vxpPpl5TjFGEU45YqtElNs6bnECCo0S0A0EAYgLI6cEBJH35YIbDsDhbUIrf4MtmqpzseV72bnbjDDrOMJ9LHCdpcbL8J7p7Vx/T3XuHbetkbOtJfMnYZQYFoVaX5OS+MNGKeElAjf7P/0LbtQ70p30yxlyNsdazePsHkTKrT5DFIqz+hCi+GVZ/5gD/Lw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 31194af6..98e2bf22 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -20,6 +20,7 @@ const PluginName = "boltdb" // Plugin BoltDB K/V storage. type Plugin struct { + sync.Mutex // db instance DB *bolt.DB // name should be UTF-8 @@ -42,6 +43,10 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) + } + s.log = log s.cfgPlugin = cfg return nil @@ -49,11 +54,14 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { // Serve is noop here func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - return errCh + s.Lock() + defer s.Unlock() + return make(chan error, 1) } func (s *Plugin) Stop() error { + s.Lock() + defer s.Unlock() const op = errors.Op("boltdb_plugin_stop") if s.DB != nil { err := s.Close() @@ -66,12 +74,19 @@ func (s *Plugin) Stop() error { func (s *Plugin) Configure(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_configure") + s.Lock() + defer s.Unlock() err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) if err != nil { return nil, errors.E(op, err) } + s.bucket = []byte(s.cfg.Bucket) + s.stop = make(chan struct{}, 1) + s.timeout = time.Duration(s.cfg.Interval) * time.Second + s.gc = &sync.Map{} + // add default values s.cfg.InitDefaults() @@ -80,6 +95,8 @@ func (s *Plugin) Configure(key string) (kv.Storage, error) { return nil, errors.E(op, err) } + s.DB = db + // create bucket if it does not exist // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { @@ -95,14 +112,8 @@ func (s *Plugin) Configure(key string) (kv.Storage, error) { return nil, errors.E(op, err) } - s.DB = db - s.bucket = []byte(s.cfg.Bucket) - s.stop = make(chan struct{}) - s.timeout = time.Duration(s.cfg.Interval) * time.Second - s.gc = &sync.Map{} - // start the GC phase - go s.gcPhase() + go s.startGCLoop() return s, nil } @@ -397,6 +408,7 @@ func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { func (s *Plugin) Close() error { // stop the keys GC s.stop <- struct{}{} + return s.DB.Close() } @@ -407,48 +419,53 @@ func (s *Plugin) Name() string { // ========================= PRIVATE ================================= -func (s *Plugin) gcPhase() { - t := time.NewTicker(s.timeout) - defer t.Stop() - for { - select { - case <-t.C: - // calculate current time before loop started to be fair - now := time.Now() - s.gc.Range(func(key, value interface{}) bool { - const op = errors.Op("boltdb_plugin_gc") - k := key.(string) - v, err := time.Parse(time.RFC3339, value.(string)) - if err != nil { - return false - } +func (s *Plugin) startGCLoop() { //nolint:gocognit + s.Lock() + defer s.Unlock() + + go func() { + t := time.NewTicker(s.timeout) + defer t.Stop() + for { + select { + case <-t.C: + // calculate current time before loop started to be fair + now := time.Now() + s.gc.Range(func(key, value interface{}) bool { + const op = errors.Op("boltdb_plugin_gc") + k := key.(string) + v, err := time.Parse(time.RFC3339, value.(string)) + if err != nil { + return false + } - if now.After(v) { - // time expired - s.gc.Delete(k) - s.log.Debug("key deleted", "key", k) - err := s.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - err := b.Delete([]byte(k)) + if now.After(v) { + // time expired + s.gc.Delete(k) + s.log.Debug("key deleted", "key", k) + err := s.DB.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + err := b.Delete([]byte(k)) + if err != nil { + return errors.E(op, err) + } + return nil + }) if err != nil { - return errors.E(op, err) + s.log.Error("error during the gc phase of update", "error", err) + // todo this error is ignored, it means, that timer still be active + // to prevent this, we need to invoke t.Stop() + return false } - return nil - }) - if err != nil { - s.log.Error("error during the gc phase of update", "error", err) - // todo this error is ignored, it means, that timer still be active - // to prevent this, we need to invoke t.Stop() - return false } - } - return true - }) - case <-s.stop: - return + return true + }) + case <-s.stop: + return + } } - } + }() } diff --git a/plugins/kv/drivers/boltdb/plugin_unit_test.go b/plugins/kv/drivers/boltdb/plugin_unit_test.go index ad3843e7..50c6c80f 100644 --- a/plugins/kv/drivers/boltdb/plugin_unit_test.go +++ b/plugins/kv/drivers/boltdb/plugin_unit_test.go @@ -62,7 +62,7 @@ func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket } // start the TTL gc - go s.gcPhase() + go s.startGCLoop() return s, nil } diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 496042a6..dff47cb8 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -17,7 +17,8 @@ var EmptyItem = kv.Item{} type Plugin struct { // config - cfg *Config + cfg *Config + cfgPlugin config.Configurer // logger log logger.Logger // memcached client @@ -27,47 +28,39 @@ type Plugin struct { // NewMemcachedClient returns a memcache client using the provided server(s) // with equal weight. If a server is listed multiple times, // it gets a proportional amount of weight. -func NewMemcachedClient(url string) kv.Storage { - m := memcache.New(url) +func NewMemcachedClient(url ...string) kv.Storage { + m := memcache.New(url...) return &Plugin{ client: m, } } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - const op = errors.Op("memcached_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) - if err != nil { - return errors.E(op, err) - } - - s.cfg.InitDefaults() + s.cfgPlugin = cfg s.log = log return nil } -func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - s.client = memcache.New(s.cfg.Addr...) - return errCh -} - -// Stop Memcached has no stop/close or smt similar to close the connection -func (s *Plugin) Stop() error { - return nil -} - // Name returns plugin user-friendly name func (s *Plugin) Name() string { return PluginName } func (s *Plugin) Configure(key string) (kv.Storage, error) { - return s, nil + const op = errors.Op("memcached_plugin_configure") + err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize default keys + s.cfg.InitDefaults() + + return NewMemcachedClient(s.cfg.Addr...), nil } // Has checks the key for existence @@ -123,7 +116,7 @@ func (s *Plugin) Get(key string) ([]byte, error) { return nil, nil } -// return map with key -- string +// MGet return map with key -- string // and map value as value -- []byte func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("memcached_plugin_mget") @@ -201,7 +194,7 @@ func (s *Plugin) Set(items ...kv.Item) error { return nil } -// Expiration is the cache expiration time, in seconds: either a relative +// MExpire Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. func (s *Plugin) MExpire(items ...kv.Item) error { @@ -230,8 +223,8 @@ func (s *Plugin) MExpire(items ...kv.Item) error { return nil } -// return time in seconds (int32) for a given keys -func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { +// TTL return time in seconds (int32) for a given keys +func (s *Plugin) TTL(_ ...string) (map[string]interface{}, error) { const op = errors.Op("memcached_plugin_ttl") return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) } diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go index 73527b97..91fe89d3 100644 --- a/plugins/kv/drivers/memory/plugin.go +++ b/plugins/kv/drivers/memory/plugin.go @@ -19,32 +19,25 @@ type Plugin struct { heap sync.Map stop chan struct{} - log logger.Logger - cfg *Config + log logger.Logger + cfg *Config + cfgPlugin config.Configurer } func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("in_memory_plugin_init") - if !cfg.Has(PluginName) { + if !cfg.Has(kv.PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) - if err != nil { - return errors.E(op, err) - } - s.cfg.InitDefaults() s.log = log - + s.cfgPlugin = cfg s.stop = make(chan struct{}, 1) return nil } func (s *Plugin) Serve() chan error { errCh := make(chan error, 1) - // start in-memory gc for kv - go s.gc() - return errCh } @@ -58,6 +51,18 @@ func (s *Plugin) Stop() error { } func (s *Plugin) Configure(key string) (kv.Storage, error) { + const op = errors.Op("inmemory_plugin_configure") + err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize default keys + s.cfg.InitDefaults() + + // start in-memory gc for kv + go s.gc() + return s, nil } @@ -232,7 +237,6 @@ func (s *Plugin) Name() string { // ================================== PRIVATE ====================================== func (s *Plugin) gc() { - // TODO check ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) for { select { diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 69b91981..cc3875e0 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -49,13 +49,13 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error { return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -//Set accept []byte flatbuffers payload with Storage and Item +// Set accept []byte flatbuffers payload with Storage and Item func (r *rpc) Set(in []byte, ok *bool) error { const op = errors.Op("rpc_set") dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() + items := make([]Item, 0, dataRoot.ItemsLength()) tmpItem := &generated.Item{} @@ -84,70 +84,155 @@ func (r *rpc) Set(in []byte, ok *bool) error { return nil } + *ok = false return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } // MGet accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) MGet(in []string, res *map[string]interface{}) error { -// const op = errors.Op("rpc server MGet") -// ret, err := r.svc.MGet(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// // update return value -// *res = ret -// return nil -//} - -//// MExpire accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) MExpire(in []Item, ok *bool) error { -// const op = errors.Op("rpc server MExpire") -// -// err := r.svc.MExpire(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// *ok = true -// return nil -//} -// -//// TTL accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) TTL(in []string, res *map[string]interface{}) error { -// const op = errors.Op("rpc server TTL") -// -// ret, err := r.svc.TTL(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// *res = ret -// return nil -//} -// -//// Delete accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) Delete(in []string, ok *bool) error { -// const op = errors.Op("rpc server Delete") -// err := r.svc.Delete(in...) -// if err != nil { -// return errors.E(op, err) -// } -// *ok = true -// return nil -//} -// -//// Close closes the storage connection -//func (r *rpc) Close(storage string, ok *bool) error { -// const op = errors.Op("rpc server Close") -// err := r.svc.Close() -// if err != nil { -// return errors.E(op, err) -// } -// *ok = true -// -// return nil -//} +func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_mget") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.MGet(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// MExpire accept []byte flatbuffers payload with Storage and Item +func (r *rpc) MExpire(in []byte, ok *bool) error { + const op = errors.Op("rpc_mexpire") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + + // when unmarshalling the keys, simultaneously, fill up the slice with items + items := make([]Item, 0, l) + tmpItem := &generated.Item{} + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + + itc := Item{ + Key: string(tmpItem.Key()), + // we set up timeout on the keys, so, value here is redundant + Value: "", + TTL: string(tmpItem.Timeout()), + } + + items = append(items, itc) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.MExpire(items...) + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// TTL accept []byte flatbuffers payload with Storage and Item +func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_ttl") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.TTL(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// Delete accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Delete(in []byte, ok *bool) error { + const op = errors.Op("rcp_delete") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.Delete(keys...) + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// Close closes the storage connection +func (r *rpc) Close(storage string, ok *bool) error { + const op = errors.Op("rpc_close") + if st, exists := r.storages[storage]; exists { + err := st.Close() + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return nil +} func strConvert(s []byte) string { return *(*string)(unsafe.Pointer(&s)) diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go index 81ca2d91..b779665b 100644 --- a/plugins/kv/storage.go +++ b/plugins/kv/storage.go @@ -105,7 +105,6 @@ func (p *Plugin) Serve() chan error { // save the storage p.storages[k] = storage - case boltdb: if _, ok := p.drivers[boltdb]; !ok { continue diff --git a/tests/plugins/kv/boltdb/configs/.rr-init.yaml b/tests/plugins/kv/boltdb/configs/.rr-init.yaml deleted file mode 100644 index e4644511..00000000 --- a/tests/plugins/kv/boltdb/configs/.rr-init.yaml +++ /dev/null @@ -1,45 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../../psr-worker-bench.php" - user: "" - group: "" - env: - "RR_HTTP": "true" - relay: "pipes" - relay_timeout: "20s" - -logs: - mode: development - level: error - -http: - address: 127.0.0.1:44933 - max_request_size: 1024 - middleware: ["gzip", "headers"] - uploads: - forbid: [".php", ".exe", ".bat"] - trusted_subnets: - [ - "10.0.0.0/8", - "127.0.0.0/8", - "172.16.0.0/12", - "192.168.0.0/16", - "::1/128", - "fc00::/7", - "fe80::/10", - ] - pool: - num_workers: 6 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - -# boltdb simple driver -boltdb: - dir: "." - file: "rr" - bucket: "test" - permissions: 777 - interval: 1 # seconds diff --git a/tests/plugins/kv/boltdb/plugin_test.go b/tests/plugins/kv/boltdb/plugin_test.go deleted file mode 100644 index fad7c7a3..00000000 --- a/tests/plugins/kv/boltdb/plugin_test.go +++ /dev/null @@ -1,195 +0,0 @@ -package boltdb_tests //nolint:golint,stylecheck - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/config" - httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb" - "github.com/spiral/roadrunner/v2/plugins/logger" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/stretchr/testify/assert" -) - -func TestBoltDb(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-init.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &boltdb.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &server.Plugin{}, - &httpPlugin.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("testBoltDbRPCMethods", testRPCMethods) - stopCh <- struct{}{} - wg.Wait() - - _ = os.Remove("rr") -} - -func testRPCMethods(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - var setRes bool - items := make([]kv.Item, 0, 5) - items = append(items, kv.Item{ - Key: "a", - Value: "aa", - }) - items = append(items, kv.Item{ - Key: "b", - Value: "bb", - }) - // add 5 second ttl - tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - items = append(items, kv.Item{ - Key: "c", - Value: "cc", - TTL: tt, - }) - - items = append(items, kv.Item{ - Key: "d", - Value: "dd", - }) - - items = append(items, kv.Item{ - Key: "e", - Value: "ee", - }) - - // Register 3 keys with values - err = client.Call("boltdb.Set", items, &setRes) - assert.NoError(t, err) - assert.True(t, setRes) - - ret := make(map[string]bool) - keys := []string{"a", "b", "c"} - err = client.Call("boltdb.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 3) // should be 3 - - // key "c" should be deleted - time.Sleep(time.Second * 7) - - ret = make(map[string]bool) - err = client.Call("boltdb.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 2) // should be 2 - - mGet := make(map[string]interface{}) - keys = []string{"a", "b", "c"} - err = client.Call("boltdb.MGet", keys, &mGet) - assert.NoError(t, err) - assert.Len(t, mGet, 2) // c is expired - assert.Equal(t, string("aa"), mGet["a"].(string)) - assert.Equal(t, string("bb"), mGet["b"].(string)) - - mExpKeys := make([]kv.Item, 0, 2) - tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - mExpKeys = append(mExpKeys, kv.Item{Key: "a", TTL: tt2}) - mExpKeys = append(mExpKeys, kv.Item{Key: "b", TTL: tt2}) - mExpKeys = append(mExpKeys, kv.Item{Key: "d", TTL: tt2}) - - // MEXPIRE - var mExpRes bool - err = client.Call("boltdb.MExpire", mExpKeys, &mExpRes) - assert.NoError(t, err) - assert.True(t, mExpRes) - - // TTL - keys = []string{"a", "b", "d"} - ttlRes := make(map[string]interface{}) - err = client.Call("boltdb.TTL", keys, &ttlRes) - assert.NoError(t, err) - assert.Len(t, ttlRes, 3) - - // HAS AFTER TTL - time.Sleep(time.Second * 15) - ret = make(map[string]bool) - keys = []string{"a", "b", "d"} - err = client.Call("boltdb.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 0) - - // DELETE - keys = []string{"e"} - var delRet bool - err = client.Call("boltdb.Delete", keys, &delRet) - assert.NoError(t, err) - assert.True(t, delRet) - - // HAS AFTER DELETE - ret = make(map[string]bool) - keys = []string{"e"} - err = client.Call("boltdb.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 0) -} diff --git a/tests/plugins/kv/configs/.rr-boltdb.yaml b/tests/plugins/kv/configs/.rr-boltdb.yaml new file mode 100644 index 00000000..81b47715 --- /dev/null +++ b/tests/plugins/kv/configs/.rr-boltdb.yaml @@ -0,0 +1,15 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +logs: + mode: development + level: error + +kv: + boltdb-rr: + driver: boltdb + dir: "." + file: "rr.db" + bucket: "test" + permissions: 0666 + interval: 1 # seconds diff --git a/tests/plugins/kv/configs/.rr-in-memory.yaml b/tests/plugins/kv/configs/.rr-in-memory.yaml new file mode 100644 index 00000000..c6c3f0f7 --- /dev/null +++ b/tests/plugins/kv/configs/.rr-in-memory.yaml @@ -0,0 +1,11 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +logs: + mode: development + level: error + +kv: + default: + driver: memory + interval: 1 diff --git a/tests/plugins/kv/configs/.rr-kv-init.yaml b/tests/plugins/kv/configs/.rr-kv-init.yaml index 935b952d..34e22a4e 100644 --- a/tests/plugins/kv/configs/.rr-kv-init.yaml +++ b/tests/plugins/kv/configs/.rr-kv-init.yaml @@ -12,7 +12,7 @@ kv: file: "rr.db" bucket: "rr" permissions: 0666 - ttl: 40s + interval: 1 boltdb-africa: driver: boltdb @@ -20,7 +20,7 @@ kv: file: "africa.db" bucket: "rr" permissions: 0666 - ttl: 40 + interval: 1 memcached: driver: memcached diff --git a/tests/plugins/kv/configs/.rr-memcached.yaml b/tests/plugins/kv/configs/.rr-memcached.yaml new file mode 100644 index 00000000..68443bc4 --- /dev/null +++ b/tests/plugins/kv/configs/.rr-memcached.yaml @@ -0,0 +1,12 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +logs: + mode: development + level: error + +kv: + memcached-rr: + driver: memcached + addr: + - "localhost:11211" diff --git a/tests/plugins/kv/memcached/configs/.rr-init.yaml b/tests/plugins/kv/memcached/configs/.rr-init.yaml deleted file mode 100644 index fbca3250..00000000 --- a/tests/plugins/kv/memcached/configs/.rr-init.yaml +++ /dev/null @@ -1,42 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../../psr-worker-bench.php" - user: "" - group: "" - env: - "RR_HTTP": "true" - relay: "pipes" - relay_timeout: "20s" - -logs: - mode: development - level: error - -http: - address: 127.0.0.1:44933 - max_request_size: 1024 - middleware: ["gzip", "headers"] - uploads: - forbid: [".php", ".exe", ".bat"] - trusted_subnets: - [ - "10.0.0.0/8", - "127.0.0.0/8", - "172.16.0.0/12", - "192.168.0.0/16", - "::1/128", - "fc00::/7", - "fe80::/10", - ] - pool: - num_workers: 6 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - -# boltdb simple driver -memcached: - addr: - - "localhost:11211"
\ No newline at end of file diff --git a/tests/plugins/kv/memcached/plugin_test.go b/tests/plugins/kv/memcached/plugin_test.go deleted file mode 100644 index ecbc7722..00000000 --- a/tests/plugins/kv/memcached/plugin_test.go +++ /dev/null @@ -1,195 +0,0 @@ -package memcached_test - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/config" - httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" - "github.com/spiral/roadrunner/v2/plugins/logger" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/stretchr/testify/assert" -) - -func TestMemcache(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-init.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &memcached.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &server.Plugin{}, - &httpPlugin.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("testMemcachedRPCMethods", testRPCMethods) - stopCh <- struct{}{} - wg.Wait() - - _ = os.Remove("rr") -} - -func testRPCMethods(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - var setRes bool - items := make([]kv.Item, 0, 5) - items = append(items, kv.Item{ - Key: "a", - Value: "aa", - }) - items = append(items, kv.Item{ - Key: "b", - Value: "bb", - }) - // add 5 second ttl - tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - items = append(items, kv.Item{ - Key: "c", - Value: "cc", - TTL: tt, - }) - - items = append(items, kv.Item{ - Key: "d", - Value: "dd", - }) - - items = append(items, kv.Item{ - Key: "e", - Value: "ee", - }) - - // Register 3 keys with values - err = client.Call("memcached.Set", items, &setRes) - assert.NoError(t, err) - assert.True(t, setRes) - - ret := make(map[string]bool) - keys := []string{"a", "b", "c"} - err = client.Call("memcached.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 3) // should be 3 - - // key "c" should be deleted - time.Sleep(time.Second * 7) - - ret = make(map[string]bool) - err = client.Call("memcached.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 2) // should be 2 - - mGet := make(map[string]interface{}) - keys = []string{"a", "b", "c"} - err = client.Call("memcached.MGet", keys, &mGet) - assert.NoError(t, err) - assert.Len(t, mGet, 2) // c is expired - assert.Equal(t, string("aa"), string(mGet["a"].([]byte))) - assert.Equal(t, string("bb"), string(mGet["b"].([]byte))) - - mExpKeys := make([]kv.Item, 0, 2) - tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - mExpKeys = append(mExpKeys, kv.Item{Key: "a", TTL: tt2}) - mExpKeys = append(mExpKeys, kv.Item{Key: "b", TTL: tt2}) - mExpKeys = append(mExpKeys, kv.Item{Key: "d", TTL: tt2}) - - // MEXPIRE - var mExpRes bool - err = client.Call("memcached.MExpire", mExpKeys, &mExpRes) - assert.NoError(t, err) - assert.True(t, mExpRes) - - // TTL call is not supported for the memcached driver - keys = []string{"a", "b", "d"} - ttlRes := make(map[string]interface{}) - err = client.Call("memcached.TTL", keys, &ttlRes) - assert.Error(t, err) - assert.Len(t, ttlRes, 0) - - // HAS AFTER TTL - time.Sleep(time.Second * 15) - ret = make(map[string]bool) - keys = []string{"a", "b", "d"} - err = client.Call("memcached.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 0) - - // DELETE - keys = []string{"e"} - var delRet bool - err = client.Call("memcached.Delete", keys, &delRet) - assert.NoError(t, err) - assert.True(t, delRet) - - // HAS AFTER DELETE - ret = make(map[string]bool) - keys = []string{"e"} - err = client.Call("memcached.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 0) -} diff --git a/tests/plugins/kv/memory/configs/.rr-init.yaml b/tests/plugins/kv/memory/configs/.rr-init.yaml deleted file mode 100644 index 8780a622..00000000 --- a/tests/plugins/kv/memory/configs/.rr-init.yaml +++ /dev/null @@ -1,42 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../../psr-worker-bench.php" - user: "" - group: "" - env: - "RR_HTTP": "true" - relay: "pipes" - relay_timeout: "20s" - -logs: - mode: development - level: error - -http: - address: 127.0.0.1:44933 - max_request_size: 1024 - middleware: ["gzip", "headers"] - uploads: - forbid: [".php", ".exe", ".bat"] - trusted_subnets: - [ - "10.0.0.0/8", - "127.0.0.0/8", - "172.16.0.0/12", - "192.168.0.0/16", - "::1/128", - "fc00::/7", - "fe80::/10", - ] - pool: - num_workers: 6 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - -# in memory KV driver -memory: - # keys ttl check interval - interval: 1 diff --git a/tests/plugins/kv/memory/plugin_test.go b/tests/plugins/kv/memory/plugin_test.go deleted file mode 100644 index 23d23bc0..00000000 --- a/tests/plugins/kv/memory/plugin_test.go +++ /dev/null @@ -1,195 +0,0 @@ -package memory_test - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/config" - httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory" - "github.com/spiral/roadrunner/v2/plugins/logger" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/stretchr/testify/assert" -) - -func TestInMemory(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-init.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &memory.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &server.Plugin{}, - &httpPlugin.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("testInMemoryRPCMethods", testRPCMethods) - stopCh <- struct{}{} - wg.Wait() - - _ = os.Remove("rr") -} - -func testRPCMethods(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - var setRes bool - items := make([]kv.Item, 0, 5) - items = append(items, kv.Item{ - Key: "a", - Value: "aa", - }) - items = append(items, kv.Item{ - Key: "b", - Value: "bb", - }) - // add 5 second ttl - tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - items = append(items, kv.Item{ - Key: "c", - Value: "cc", - TTL: tt, - }) - - items = append(items, kv.Item{ - Key: "d", - Value: "dd", - }) - - items = append(items, kv.Item{ - Key: "e", - Value: "ee", - }) - - // Register 3 keys with values - err = client.Call("memory.Set", items, &setRes) - assert.NoError(t, err) - assert.True(t, setRes) - - ret := make(map[string]bool) - keys := []string{"a", "b", "c"} - err = client.Call("memory.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 3) // should be 3 - - // key "c" should be deleted - time.Sleep(time.Second * 7) - - ret = make(map[string]bool) - err = client.Call("memory.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 2) // should be 2 - - mGet := make(map[string]interface{}) - keys = []string{"a", "b", "c"} - err = client.Call("memory.MGet", keys, &mGet) - assert.NoError(t, err) - assert.Len(t, mGet, 2) // c is expired - assert.Equal(t, string("aa"), mGet["a"].(string)) - assert.Equal(t, string("bb"), mGet["b"].(string)) - - mExpKeys := make([]kv.Item, 0, 2) - tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - mExpKeys = append(mExpKeys, kv.Item{Key: "a", TTL: tt2}) - mExpKeys = append(mExpKeys, kv.Item{Key: "b", TTL: tt2}) - mExpKeys = append(mExpKeys, kv.Item{Key: "d", TTL: tt2}) - - // MEXPIRE - var mExpRes bool - err = client.Call("memory.MExpire", mExpKeys, &mExpRes) - assert.NoError(t, err) - assert.True(t, mExpRes) - - // TTL - keys = []string{"a", "b", "d"} - ttlRes := make(map[string]interface{}) - err = client.Call("memory.TTL", keys, &ttlRes) - assert.NoError(t, err) - assert.Len(t, ttlRes, 3) - - // HAS AFTER TTL - time.Sleep(time.Second * 15) - ret = make(map[string]bool) - keys = []string{"a", "b", "d"} - err = client.Call("memory.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 0) - - // DELETE - keys = []string{"e"} - var delRet bool - err = client.Call("memory.Delete", keys, &delRet) - assert.NoError(t, err) - assert.True(t, delRet) - - // HAS AFTER DELETE - ret = make(map[string]bool) - keys = []string{"e"} - err = client.Call("memory.Has", keys, &ret) - assert.NoError(t, err) - assert.Len(t, ret, 0) -} diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index 6d270d3f..0b42e19d 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -30,7 +30,7 @@ func makePayload(b *flatbuffers.Builder, storage string, items []kv.Item) []byte storageOffset := b.CreateString(storage) - ////////////////////// ITEMS VECTOR //////////////////////////// + // //////////////////// ITEMS VECTOR //////////////////////////// offset := make([]flatbuffers.UOffsetT, len(items)) for i := len(items) - 1; i >= 0; i-- { offset[i] = serializeItems(b, items[i]) @@ -43,7 +43,7 @@ func makePayload(b *flatbuffers.Builder, storage string, items []kv.Item) []byte } itemsOffset := b.EndVector(len(offset)) - /////////////////////////////////////////////////////////////////// + // ///////////////////////////////////////////////////////////////// generated.PayloadStart(b) generated.PayloadAddItems(b, itemsOffset) @@ -187,3 +187,605 @@ func kvHasTest(t *testing.T) { err = client.Call("kv.Has", args, &ret) assert.NoError(t, err) } + +func TestBoltDb(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-boltdb.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &kv.Plugin{}, + &boltdb.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("testBoltDbRPCMethods", testRPCMethods) + stopCh <- struct{}{} + wg.Wait() + + _ = os.Remove("rr.db") +} + +func testRPCMethods(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + // add 5 second ttl + tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) + keys := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, + }) + data := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + TTL: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, + }) + + var setRes bool + + // Register 3 keys with values + err = client.Call("kv.Set", data, &setRes) + assert.NoError(t, err) + assert.True(t, setRes) + + ret := make(map[string]bool) + err = client.Call("kv.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 3) // should be 3 + + // key "c" should be deleted + time.Sleep(time.Second * 7) + + ret = make(map[string]bool) + err = client.Call("kv.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 2) // should be 2 + + mGet := make(map[string]interface{}) + err = client.Call("kv.MGet", keys, &mGet) + assert.NoError(t, err) + assert.Len(t, mGet, 2) // c is expired + assert.Equal(t, "aa", mGet["a"].(string)) + assert.Equal(t, "bb", mGet["b"].(string)) + + tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) + data2 := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ + { + Key: "a", + TTL: tt2, + }, + { + Key: "b", + TTL: tt2, + }, + { + Key: "d", + TTL: tt2, + }, + }) + + // MEXPIRE + var mExpRes bool + err = client.Call("kv.MExpire", data2, &mExpRes) + assert.NoError(t, err) + assert.True(t, mExpRes) + + // TTL + keys2 := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, + }) + ttlRes := make(map[string]interface{}) + err = client.Call("kv.TTL", keys2, &ttlRes) + assert.NoError(t, err) + assert.Len(t, ttlRes, 3) + + // HAS AFTER TTL + time.Sleep(time.Second * 15) + ret = make(map[string]bool) + err = client.Call("kv.Has", keys2, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) + + // DELETE + keysDel := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ + { + Key: "e", + }, + }) + var delRet bool + err = client.Call("kv.Delete", keysDel, &delRet) + assert.NoError(t, err) + assert.True(t, delRet) + + // HAS AFTER DELETE + ret = make(map[string]bool) + err = client.Call("kv.Has", keysDel, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) +} + +func TestMemcached(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-memcached.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &kv.Plugin{}, + &memcached.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("testMemcachedRPCMethods", testRPCMethodsMemcached) + stopCh <- struct{}{} + wg.Wait() +} + +func testRPCMethodsMemcached(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + // add 5 second ttl + tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) + keys := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, + }) + data := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + TTL: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, + }) + + var setRes bool + + // Register 3 keys with values + err = client.Call("kv.Set", data, &setRes) + assert.NoError(t, err) + assert.True(t, setRes) + + ret := make(map[string]bool) + err = client.Call("kv.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 3) // should be 3 + + // key "c" should be deleted + time.Sleep(time.Second * 7) + + ret = make(map[string]bool) + err = client.Call("kv.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 2) // should be 2 + + mGet := make(map[string]interface{}) + err = client.Call("kv.MGet", keys, &mGet) + assert.NoError(t, err) + assert.Len(t, mGet, 2) // c is expired + assert.Equal(t, string("aa"), string(mGet["a"].([]byte))) + assert.Equal(t, string("bb"), string(mGet["b"].([]byte))) + + tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) + data2 := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ + { + Key: "a", + TTL: tt2, + }, + { + Key: "b", + TTL: tt2, + }, + { + Key: "d", + TTL: tt2, + }, + }) + + // MEXPIRE + var mExpRes bool + err = client.Call("kv.MExpire", data2, &mExpRes) + assert.NoError(t, err) + assert.True(t, mExpRes) + + // TTL call is not supported for the memcached driver + keys2 := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, + }) + ttlRes := make(map[string]interface{}) + err = client.Call("kv.TTL", keys2, &ttlRes) + assert.Error(t, err) + assert.Len(t, ttlRes, 0) + + // HAS AFTER TTL + time.Sleep(time.Second * 15) + ret = make(map[string]bool) + err = client.Call("kv.Has", keys2, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) + + // DELETE + keysDel := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ + { + Key: "e", + }, + }) + var delRet bool + err = client.Call("kv.Delete", keysDel, &delRet) + assert.NoError(t, err) + assert.True(t, delRet) + + // HAS AFTER DELETE + ret = make(map[string]bool) + err = client.Call("kv.Has", keysDel, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) +} + +func TestInMemory(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-in-memory.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &kv.Plugin{}, + &memory.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("testInMemoryRPCMethods", testRPCMethodsInMemory) + stopCh <- struct{}{} + wg.Wait() +} + +func testRPCMethodsInMemory(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + // add 5 second ttl + tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) + keys := makePayload(flatbuffers.NewBuilder(100), "default", []kv.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, + }) + data := makePayload(flatbuffers.NewBuilder(100), "default", []kv.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + TTL: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, + }) + + var setRes bool + + // Register 3 keys with values + err = client.Call("kv.Set", data, &setRes) + assert.NoError(t, err) + assert.True(t, setRes) + + ret := make(map[string]bool) + err = client.Call("kv.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 3) // should be 3 + + // key "c" should be deleted + time.Sleep(time.Second * 7) + + ret = make(map[string]bool) + err = client.Call("kv.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 2) // should be 2 + + mGet := make(map[string]interface{}) + err = client.Call("kv.MGet", keys, &mGet) + assert.NoError(t, err) + assert.Len(t, mGet, 2) // c is expired + assert.Equal(t, "aa", mGet["a"].(string)) + assert.Equal(t, "bb", mGet["b"].(string)) + + tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) + data2 := makePayload(flatbuffers.NewBuilder(100), "default", []kv.Item{ + { + Key: "a", + TTL: tt2, + }, + { + Key: "b", + TTL: tt2, + }, + { + Key: "d", + TTL: tt2, + }, + }) + + // MEXPIRE + var mExpRes bool + err = client.Call("kv.MExpire", data2, &mExpRes) + assert.NoError(t, err) + assert.True(t, mExpRes) + + // TTL + keys2 := makePayload(flatbuffers.NewBuilder(100), "default", []kv.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, + }) + ttlRes := make(map[string]interface{}) + err = client.Call("kv.TTL", keys2, &ttlRes) + assert.NoError(t, err) + assert.Len(t, ttlRes, 3) + + // HAS AFTER TTL + time.Sleep(time.Second * 15) + ret = make(map[string]bool) + err = client.Call("kv.Has", keys2, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) + + // DELETE + keysDel := makePayload(flatbuffers.NewBuilder(100), "default", []kv.Item{ + { + Key: "e", + }, + }) + var delRet bool + err = client.Call("kv.Delete", keysDel, &delRet) + assert.NoError(t, err) + assert.True(t, delRet) + + // HAS AFTER DELETE + ret = make(map[string]bool) + err = client.Call("kv.Has", keysDel, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) +} |