diff options
author | Valery Piashchynski <[email protected]> | 2021-01-07 01:06:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-07 01:06:50 +0300 |
commit | c1465d3bcdf24a78440300aa51e7cfc92ce874a8 (patch) | |
tree | 6e0f5107eba90df73724b6611ca6adfa148d2a3f | |
parent | c9f670ee734355cbc5d504186946b7db67cf62b5 (diff) |
KV, updated, bug fixed, with intergration tests via plugins
-rw-r--r-- | .github/workflows/build.yml | 2 | ||||
-rw-r--r-- | .vscode/launch.json | 16 | ||||
-rw-r--r-- | .vscode/settings.json | 1 | ||||
-rwxr-xr-x | Makefile | 8 | ||||
-rwxr-xr-x | go.sum | 21 | ||||
-rw-r--r-- | plugins/kv/boltdb/config.go | 11 | ||||
-rw-r--r-- | plugins/kv/boltdb/plugin.go | 148 | ||||
-rw-r--r-- | plugins/kv/boltdb/plugin_unit_test.go | 172 | ||||
-rw-r--r-- | plugins/kv/interface.go | 20 | ||||
-rw-r--r-- | plugins/kv/memcached/plugin.go | 25 | ||||
-rw-r--r-- | plugins/kv/memcached/storage_test.go | 108 | ||||
-rw-r--r-- | plugins/kv/memory/plugin.go (renamed from plugins/kv/memory/storage.go) | 31 | ||||
-rw-r--r-- | plugins/kv/memory/storage_test.go | 114 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 110 | ||||
-rwxr-xr-x | rr | bin | 0 -> 18108416 bytes | |||
-rw-r--r-- | tests/docker-compose.yaml | 7 | ||||
-rw-r--r-- | tests/plugins/kv/boltdb/configs/.rr-init.yaml | 46 | ||||
-rw-r--r-- | tests/plugins/kv/boltdb/plugin_test.go | 195 | ||||
-rw-r--r-- | tests/plugins/kv/memcached/configs/.rr-init.yaml | 43 | ||||
-rw-r--r-- | tests/plugins/kv/memcached/plugin_test.go | 195 | ||||
-rw-r--r-- | tests/plugins/kv/memory/configs/.rr-init.yaml | 42 | ||||
-rw-r--r-- | tests/plugins/kv/memory/plugin_test.go | 195 |
22 files changed, 1194 insertions, 316 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b0cd285d..4c57125d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -65,6 +65,7 @@ jobs: - name: Run golang tests on Windows without codecov if: ${{ matrix.os == 'windows-latest' }} run: | + docker-compose -f ./tests/docker-compose.yaml up -d go test -v -race -cover -tags=debug ./utils go test -v -race -cover -tags=debug ./pkg/pipe go test -v -race -cover -tags=debug ./pkg/pool @@ -88,6 +89,7 @@ jobs: - name: Run golang tests on Linux and MacOS if: ${{ matrix.os != 'windows-latest' }} run: | + docker-compose -f ./tests/docker-compose.yaml up -d mkdir ./coverage-ci go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/utils.txt -covermode=atomic ./utils go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe diff --git a/.vscode/launch.json b/.vscode/launch.json index 3cf21fed..f43ef860 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,6 +5,13 @@ "version": "0.2.0", "configurations": [ { + "name": "Launch test file", + "type": "go", + "request": "launch", + "mode": "test", + "program": "${file}" + }, + { "name": "Launch main debug, race", "type": "go", "request": "launch", @@ -13,13 +20,6 @@ "buildFlags": "-race", "args": ["serve", "-c", "../.rr.yaml"], "program": "${workspaceFolder}/cmd/main.go" - }, - { - "name": "Launch file", - "type": "go", - "request": "launch", - "mode": "debug", - "program": "${file}" - }, + } ] }
\ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 70a50b98..78560788 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,6 +8,7 @@ "cSpell.words": [ "asdf", "bbolt", + "gofiber", "stopc", "treshholdc" ] @@ -25,6 +25,7 @@ uninstall: ## Uninstall locally installed RR test: ## Run application tests #go clean -testcache + docker-compose -f docker-compose.yaml up -d go test -v -race -cover -tags=debug -covermode=atomic ./utils go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pipe go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pool @@ -46,9 +47,16 @@ test: ## Run application tests go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/static go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/boltdb go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/memory + docker-compose down lint: ## Run application linters golangci-lint run kv: + docker-compose -f tests/docker-compose.yaml up -d go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/boltdb go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/memory + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/memcached + go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/kv/boltdb + go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/kv/memory + go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/kv/memcached + docker-compose down @@ -60,6 +60,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/containerd/containerd v1.4.3 h1:ijQT13JedHSHrQGWFcGEwzcNKrAGIiZ+jSD5QQG07SY= +github.com/containerd/containerd v1.4.3/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= @@ -75,6 +77,15 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug= +github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= +github.com/docker/docker v1.13.1 h1:IkZjBSIc8hBjLpqeAbeE5mca5mNgeatLHBy3GO78BWo= +github.com/docker/docker v20.10.2+incompatible h1:vFgEHPqWBTp4pTjdLwjAA4bSo3gvIGOYwuJTlEjVBCw= +github.com/docker/docker v20.10.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= +github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= @@ -103,6 +114,7 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/gofiber/fiber/v2 v2.3.0 h1:82ufvLne0cxzdkDOeLkUmteA+z1uve9JQ/ZFsMOnkzc= github.com/gofiber/fiber/v2 v2.3.0/go.mod h1:f8BRRIMjMdRyt2qmJ/0Sea3j3rwwfufPrh9WNBRiVZ0= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -244,6 +256,10 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.4 h1:NiTx7EEvBzu9sFOD1zORteLSt3o8gnlvZZwSE9TnY9U= github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI= +github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= @@ -290,6 +306,7 @@ github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMT github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -328,6 +345,8 @@ github.com/spiral/errors v1.0.6 h1:berk5ShEILSw6DplUVv9Ea1wGdk2WlVKQpuvDngll0U= github.com/spiral/errors v1.0.6/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.7 h1:GRN7Sjk4yVavD2W+1fUWBjqoivWQsnbsXbX7xyhZhbU= github.com/spiral/errors v1.0.7/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/goridge v1.0.4 h1:qnYtI84H0tcYjcbFdFl/VUFQZ0YUE9p+VuU8In4kC/8= +github.com/spiral/goridge v2.1.4+incompatible h1:L15TKrbPEp/G6JfS3jjuvY6whkhfD292XX+1iy9mO2k= github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc= github.com/spiral/goridge/v3 v3.0.0-beta8 h1:x8uXCdhY49U1LEvmehnTaD2El6J9ZHAefRdh/QIZ6A4= github.com/spiral/goridge/v3 v3.0.0-beta8/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= @@ -530,10 +549,12 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= +google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a h1:Ob5/580gVHBJZgXnff1cZDbG+xLtMVE5mDRTe+nIsX4= google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= +google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/plugins/kv/boltdb/config.go b/plugins/kv/boltdb/config.go index 6b116611..b2e1e636 100644 --- a/plugins/kv/boltdb/config.go +++ b/plugins/kv/boltdb/config.go @@ -8,16 +8,17 @@ type Config struct { File string // Bucket to store data in boltDB Bucket string - + // db file permissions Permissions int - TTL int + // timeout + Interval uint `yaml:"interval"` } -func (s *Config) InitDefaults() error { +// InitDefaults initializes default values for the boltdb +func (s *Config) InitDefaults() { s.Dir = "." // current dir s.Bucket = "rr" // default bucket name s.File = "rr.db" // default file name s.Permissions = 0777 // free for all - s.TTL = 60 // 60 seconds is default TTL - return nil + s.Interval = 60 // default is 60 seconds timeout } diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/boltdb/plugin.go index e5eda0c2..6cfc49f6 100644 --- a/plugins/kv/boltdb/plugin.go +++ b/plugins/kv/boltdb/plugin.go @@ -2,7 +2,6 @@ package boltdb import ( "bytes" - "context" "encoding/gob" "os" "path" @@ -41,78 +40,23 @@ type Plugin struct { stop chan struct{} } -// NewBoltClient instantiate new BOLTDB client -// The parameters are: -// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created -// perm os.FileMode -- file permissions, for example 0777 -// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other -// bucket string -- name of the bucket to use, should be UTF-8 -func NewBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) { - const op = errors.Op("newBoltClient") - db, err := bolt.Open(path, perm, options) - if err != nil { - return nil, errors.E(op, err) - } - - // bucket should be SET - if bucket == "" { - return nil, errors.E(op, errors.Str("bucket should be set")) - } - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - _, err = tx.CreateBucketIfNotExists([]byte(bucket)) - if err != nil { - return errors.E(op, err) - } - return nil - }) - if err != nil { - return nil, errors.E(op, err) - } - - // if TTL is not set, make it default - if ttl == 0 { - ttl = time.Minute - } - - s := &Plugin{ - DB: db, - bucket: []byte(bucket), - stop: make(chan struct{}), - timeout: ttl, - gc: &sync.Map{}, - } - - // start the TTL gc - go s.gcPhase() - - return s, nil -} - func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { const op = errors.Op("boltdb plugin init") s.cfg = &Config{} + s.cfg.InitDefaults() + err := cfg.UnmarshalKey(PluginName, &s.cfg) if err != nil { return errors.E(op, errors.Disabled, err) } + // set the logger s.log = log - return nil -} - -func (s *Plugin) Serve() chan error { - const op = errors.Op("boltdb serve") - errCh := make(chan error, 1) - db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil) if err != nil { - errCh <- errors.E(op, err) - return errCh + return errors.E(op, err) } // create bucket if it does not exist @@ -125,24 +69,29 @@ func (s *Plugin) Serve() chan error { } return nil }) + if err != nil { - errCh <- err - return errCh + return errors.E(op, err) } s.DB = db s.bucket = []byte(s.cfg.Bucket) s.stop = make(chan struct{}) - s.timeout = time.Duration(s.cfg.TTL) * time.Second + s.timeout = time.Duration(s.cfg.Interval) * time.Second s.gc = &sync.Map{} + return nil +} + +func (s *Plugin) Serve() chan error { + errCh := make(chan error, 1) // start the TTL gc go s.gcPhase() return errCh } -func (s Plugin) Stop() error { +func (s *Plugin) Stop() error { const op = errors.Op("boltdb stop") err := s.Close() if err != nil { @@ -151,8 +100,9 @@ func (s Plugin) Stop() error { return nil } -func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) { +func (s *Plugin) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("boltdb Has") + s.log.Debug("boltdb HAS method called", "args", keys) if keys == nil { return nil, errors.E(op, errors.NoKeys) } @@ -164,8 +114,8 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error // Get retrieves the value for a key in the bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket. // The returned value is only valid for the life of the transaction. - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) if keyTrimmed == "" { return errors.E(op, errors.EmptyKey) } @@ -173,24 +123,25 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error if b == nil { return errors.E(op, errors.NoSuchBucket) } - exist := b.Get([]byte(key)) + exist := b.Get([]byte(keys[i])) if exist != nil { - m[key] = true + m[keys[i]] = true } } return nil }) if err != nil { - return nil, err + return nil, errors.E(op, err) } + s.log.Debug("boltdb HAS method finished") return m, nil } // Get retrieves the value for a key in the bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket. // The returned value is only valid for the life of the transaction. -func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { +func (s *Plugin) Get(key string) ([]byte, error) { const op = errors.Op("boltdb Get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -211,7 +162,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { buf := bytes.NewReader(val) decoder := gob.NewDecoder(buf) - i := kv.Item{} + var i string err := decoder.Decode(&i) if err != nil { // unsafe (w/o runes) convert @@ -219,7 +170,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { } // set the value - val = []byte(i.Value) + val = []byte(i) } return nil }) @@ -230,7 +181,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { return val, nil } -func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("boltdb MGet") // defence if keys == nil { @@ -238,8 +189,8 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ } // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) if keyTrimmed == "" { return nil, errors.E(op, errors.EmptyKey) } @@ -253,10 +204,22 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ return errors.E(op, errors.NoSuchBucket) } - for _, key := range keys { - value := b.Get([]byte(key)) + buf := new(bytes.Buffer) + var out string + buf.Grow(100) + for i := range keys { + value := b.Get([]byte(keys[i])) + buf.Write(value) + // allocate enough space + dec := gob.NewDecoder(buf) if value != nil { - m[key] = value + err := dec.Decode(&out) + if err != nil { + return errors.E(op, err) + } + m[keys[i]] = out + buf.Reset() + out = "" } } @@ -270,7 +233,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ } // Set puts the K/V to the bolt -func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { +func (s *Plugin) Set(items ...kv.Item) error { const op = errors.Op("boltdb Set") if items == nil { return errors.E(op, errors.NoKeys) @@ -303,7 +266,8 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { return errors.E(op, errors.EmptyItem) } - err = encoder.Encode(&items[i]) + // Encode value + err = encoder.Encode(&items[i].Value) if err != nil { return errors.E(op, err) } @@ -321,6 +285,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { if err != nil { return errors.E(op, err) } + // Store key TTL in the separate map s.gc.Store(items[i].Key, items[i].TTL) } @@ -331,7 +296,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { } // Delete all keys from DB -func (s Plugin) Delete(ctx context.Context, keys ...string) error { +func (s *Plugin) Delete(keys ...string) error { const op = errors.Op("boltdb Delete") if keys == nil { return errors.E(op, errors.NoKeys) @@ -378,7 +343,7 @@ func (s Plugin) Delete(ctx context.Context, keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { +func (s *Plugin) MExpire(items ...kv.Item) error { const op = errors.Op("boltdb MExpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { @@ -396,7 +361,7 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { return nil } -func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { const op = errors.Op("boltdb TTL") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -422,15 +387,25 @@ func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{} } // Close the DB connection -func (s Plugin) Close() error { +func (s *Plugin) Close() error { // stop the keys GC s.stop <- struct{}{} return s.DB.Close() } +// RPCService returns associated rpc service. +func (s *Plugin) RPC() interface{} { + return kv.NewRPCServer(s, s.log) +} + +// Name returns plugin name +func (s *Plugin) Name() string { + return PluginName +} + // ========================= PRIVATE ================================= -func (s Plugin) gcPhase() { +func (s *Plugin) gcPhase() { t := time.NewTicker(s.timeout) defer t.Stop() for { @@ -449,6 +424,7 @@ func (s Plugin) gcPhase() { if now.After(v) { // time expired s.gc.Delete(k) + s.log.Debug("key deleted", "key", k) err := s.DB.Update(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucket) if b == nil { diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/boltdb/plugin_unit_test.go index a12c830d..2459e493 100644 --- a/plugins/kv/boltdb/plugin_unit_test.go +++ b/plugins/kv/boltdb/plugin_unit_test.go @@ -1,19 +1,74 @@ package boltdb import ( - "context" "os" "strconv" "sync" "testing" "time" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/stretchr/testify/assert" + bolt "go.etcd.io/bbolt" + "go.uber.org/zap" ) +// NewBoltClient instantiate new BOLTDB client +// The parameters are: +// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created +// perm os.FileMode -- file permissions, for example 0777 +// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other +// bucket string -- name of the bucket to use, should be UTF-8 +func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) { + const op = errors.Op("newBoltClient") + db, err := bolt.Open(path, perm, options) + if err != nil { + return nil, errors.E(op, err) + } + + // bucket should be SET + if bucket == "" { + return nil, errors.E(op, errors.Str("bucket should be set")) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + _, err = tx.CreateBucketIfNotExists([]byte(bucket)) + if err != nil { + return errors.E(op, err) + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + // if TTL is not set, make it default + if ttl == 0 { + ttl = time.Minute + } + + l, _ := zap.NewDevelopment() + s := &Plugin{ + DB: db, + bucket: []byte(bucket), + stop: make(chan struct{}), + timeout: ttl, + gc: &sync.Map{}, + log: logger.NewZapAdapter(l), + } + + // start the TTL gc + go s.gcPhase() + + return s, nil +} + func initStorage() kv.Storage { - storage, err := NewBoltClient("rr.db", 0777, nil, "rr", time.Second) + storage, err := newBoltClient("rr.db", 0777, nil, "rr", time.Second) if err != nil { panic(err) } @@ -36,7 +91,7 @@ func TestStorage_Has(t *testing.T) { cleanup(t, "rr.db") }() - v, err := s.Has(context.Background(), "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) } @@ -50,13 +105,12 @@ func TestStorage_Has_Set_Has(t *testing.T) { cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -66,7 +120,7 @@ func TestStorage_Has_Set_Has(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -82,13 +136,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -98,7 +151,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -115,7 +168,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { m.Lock() // set is writable transaction // it should stop readable - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key" + strconv.Itoa(i), Value: "hello world" + strconv.Itoa(i), TTL: "", @@ -133,7 +186,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.RLock() - v, err = s.Has(ctx, "key") + v, err = s.Has("key") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -146,7 +199,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.Lock() - err = s.Delete(ctx, "key"+strconv.Itoa(i)) + err = s.Delete("key" + strconv.Itoa(i)) assert.NoError(t, err) m.Unlock() } @@ -164,13 +217,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) { cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -180,13 +232,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } @@ -200,13 +252,12 @@ func TestStorage_Has_Set_Get(t *testing.T) { cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -216,13 +267,13 @@ func TestStorage_Has_Set_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) - // no such key + assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.Get(ctx, "key") + res, err := s.Get("key") assert.NoError(t, err) if string(res) != "hello world" { @@ -239,13 +290,12 @@ func TestStorage_Set_Del_Get(t *testing.T) { cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -255,20 +305,20 @@ func TestStorage_Set_Del_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) // check that keys are present - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) - assert.NoError(t, s.Delete(ctx, "key", "key2")) + assert.NoError(t, s.Delete("key", "key2")) // check that keys are not present - res, err = s.MGet(ctx, "key", "key2") + res, err = s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 0) } @@ -281,13 +331,12 @@ func TestStorage_Set_GetM(t *testing.T) { } cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -297,14 +346,13 @@ func TestStorage_Set_GetM(t *testing.T) { TTL: "", })) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } func TestNilAndWrongArgs(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { if err := s.Close(); err != nil { panic(err) @@ -313,61 +361,60 @@ func TestNilAndWrongArgs(t *testing.T) { }() // check - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - _, err = s.Has(ctx, "") + _, err = s.Has("") assert.Error(t, err) - _, err = s.Get(ctx, "") + _, err = s.Get("") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", "") + _, err = s.MGet("key", "key2", "") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", " ") + _, err = s.MGet("key", "key2", " ") assert.Error(t, err) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", })) - assert.Error(t, s.Set(ctx, kv.Item{ + assert.Error(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "asdf", })) - _, err = s.Has(ctx, "key") + _, err = s.Has("key") assert.NoError(t, err) - assert.Error(t, s.Set(ctx, kv.Item{})) + assert.Error(t, s.Set(kv.Item{})) - err = s.Delete(ctx, "") + err = s.Delete("") assert.Error(t, err) - err = s.Delete(ctx, "key", "") + err = s.Delete("key", "") assert.Error(t, err) - err = s.Delete(ctx, "key", " ") + err = s.Delete("key", " ") assert.Error(t, err) - err = s.Delete(ctx, "key") + err = s.Delete("key") assert.NoError(t, err) } func TestStorage_MExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { if err := s.Close(); err != nil { panic(err) @@ -376,12 +423,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -404,12 +451,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { Value: "", TTL: nowPlusFive, } - assert.NoError(t, s.MExpire(ctx, i1, i2)) + assert.NoError(t, s.MExpire(i1, i2)) time.Sleep(time.Second * 6) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) @@ -417,7 +464,6 @@ func TestStorage_MExpire_TTL(t *testing.T) { func TestStorage_SetExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { if err := s.Close(); err != nil { panic(err) @@ -426,12 +472,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -445,7 +491,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) // set timeout to 5 sec - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: nowPlusFive, @@ -457,7 +503,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { })) time.Sleep(time.Second * 2) - m, err := s.TTL(ctx, "key", "key2") + m, err := s.TTL("key", "key2") assert.NoError(t, err) // remove a precision 4.02342342 -> 4 @@ -478,7 +524,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { time.Sleep(time.Second * 4) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 3512fd73..c1367cdf 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -1,10 +1,6 @@ package kv // Item represents general storage item -import ( - "context" -) - type Item struct { // Key of item Key string @@ -17,28 +13,28 @@ type Item struct { // Storage represents single abstract storage. type Storage interface { // Has checks if value exists. - Has(ctx context.Context, keys ...string) (map[string]bool, error) + Has(keys ...string) (map[string]bool, error) // Get loads value content into a byte slice. - Get(ctx context.Context, key string) ([]byte, error) + Get(key string) ([]byte, error) // MGet loads content of multiple values - // If there are no values for keys, key will no be in the map - MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) + // Returns the map with existing keys and associated values + MGet(keys ...string) (map[string]interface{}, error) // Set used to upload item to KV with TTL // 0 value in TTL means no TTL - Set(ctx context.Context, items ...Item) error + Set(items ...Item) error // MExpire sets the TTL for multiply keys - MExpire(ctx context.Context, items ...Item) error + MExpire(items ...Item) error // TTL return the rest time to live for provided keys // Not supported for the memcached and boltdb - TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) + TTL(keys ...string) (map[string]interface{}, error) // Delete one or multiple keys. - Delete(ctx context.Context, keys ...string) error + Delete(keys ...string) error // Close closes the storage and underlying resources. Close() error diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go index 69f96bfe..f5111c04 100644 --- a/plugins/kv/memcached/plugin.go +++ b/plugins/kv/memcached/plugin.go @@ -1,7 +1,6 @@ package memcached import ( - "context" "strings" "time" @@ -58,8 +57,18 @@ func (s *Plugin) Stop() error { return nil } +// RPCService returns associated rpc service. +func (s *Plugin) RPC() interface{} { + return kv.NewRPCServer(s, s.log) +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} + // Has checks the key for existence -func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) { +func (s Plugin) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("memcached Has") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -84,7 +93,7 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error // Get gets the item for the given key. ErrCacheMiss is returned for a // memcache cache miss. The key must be at most 250 bytes in length. -func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { +func (s Plugin) Get(key string) ([]byte, error) { const op = errors.Op("memcached Get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -106,7 +115,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { // return map with key -- string // and map value as value -- []byte -func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s Plugin) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("memcached MGet") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -141,7 +150,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { +func (s Plugin) Set(items ...kv.Item) error { const op = errors.Op("memcached Set") if items == nil { return errors.E(op, errors.NoKeys) @@ -182,7 +191,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { +func (s Plugin) MExpire(items ...kv.Item) error { const op = errors.Op("memcached MExpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { @@ -209,12 +218,12 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { } // return time in seconds (int32) for a given keys -func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s Plugin) TTL(keys ...string) (map[string]interface{}, error) { const op = errors.Op("memcached HTTLas") return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) } -func (s Plugin) Delete(ctx context.Context, keys ...string) error { +func (s Plugin) Delete(keys ...string) error { const op = errors.Op("memcached Has") if keys == nil { return errors.E(op, errors.NoKeys) diff --git a/plugins/kv/memcached/storage_test.go b/plugins/kv/memcached/storage_test.go index 4b59bbd0..3d37748b 100644 --- a/plugins/kv/memcached/storage_test.go +++ b/plugins/kv/memcached/storage_test.go @@ -1,7 +1,6 @@ package memcached import ( - "context" "strconv" "sync" "testing" @@ -16,7 +15,7 @@ func initStorage() kv.Storage { } func cleanup(t *testing.T, s kv.Storage, keys ...string) { - err := s.Delete(context.Background(), keys...) + err := s.Delete(keys...) if err != nil { t.Fatalf("error during cleanup: %s", err.Error()) } @@ -25,9 +24,7 @@ func cleanup(t *testing.T, s kv.Storage, keys ...string) { func TestStorage_Has(t *testing.T) { s := initStorage() - ctx := context.Background() - - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) } @@ -41,13 +38,12 @@ func TestStorage_Has_Set_Has(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -57,7 +53,7 @@ func TestStorage_Has_Set_Has(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -73,13 +69,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -89,13 +84,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } @@ -109,13 +104,12 @@ func TestStorage_Has_Set_Get(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -125,13 +119,13 @@ func TestStorage_Has_Set_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.Get(ctx, "key") + res, err := s.Get("key") assert.NoError(t, err) if string(res) != "hello world" { @@ -148,13 +142,12 @@ func TestStorage_Set_Del_Get(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -164,27 +157,26 @@ func TestStorage_Set_Del_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) // check that keys are present - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) - assert.NoError(t, s.Delete(ctx, "key", "key2")) + assert.NoError(t, s.Delete("key", "key2")) // check that keys are not present - res, err = s.MGet(ctx, "key", "key2") + res, err = s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 0) } func TestStorage_Set_GetM(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") @@ -194,11 +186,11 @@ func TestStorage_Set_GetM(t *testing.T) { } }() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -208,14 +200,13 @@ func TestStorage_Set_GetM(t *testing.T) { TTL: "", })) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } func TestStorage_MExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") if err := s.Close(); err != nil { @@ -224,12 +215,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -252,12 +243,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { Value: "", TTL: nowPlusFive, } - assert.NoError(t, s.MExpire(ctx, i1, i2)) + assert.NoError(t, s.MExpire(i1, i2)) time.Sleep(time.Second * 6) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) @@ -265,7 +256,6 @@ func TestStorage_MExpire_TTL(t *testing.T) { func TestNilAndWrongArgs(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key") if err := s.Close(); err != nil { @@ -274,46 +264,45 @@ func TestNilAndWrongArgs(t *testing.T) { }() // check - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - _, err = s.Has(ctx, "") + _, err = s.Has("") assert.Error(t, err) - _, err = s.Get(ctx, "") + _, err = s.Get("") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", "") + _, err = s.MGet("key", "key2", "") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", " ") + _, err = s.MGet("key", "key2", " ") assert.Error(t, err) - assert.Error(t, s.Set(ctx, kv.Item{})) + assert.Error(t, s.Set(kv.Item{})) - err = s.Delete(ctx, "") + err = s.Delete("") assert.Error(t, err) - err = s.Delete(ctx, "key", "") + err = s.Delete("key", "") assert.Error(t, err) - err = s.Delete(ctx, "key", " ") + err = s.Delete("key", " ") assert.Error(t, err) - err = s.Delete(ctx, "key") + err = s.Delete("key") assert.NoError(t, err) } func TestStorage_SetExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") if err := s.Close(); err != nil { @@ -322,12 +311,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -341,7 +330,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) // set timeout to 5 sec - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: nowPlusFive, @@ -355,7 +344,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { time.Sleep(time.Second * 6) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) @@ -370,13 +359,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -386,7 +374,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -403,7 +391,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { m.Lock() // set is writable transaction // it should stop readable - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key" + strconv.Itoa(i), Value: "hello world" + strconv.Itoa(i), TTL: "", @@ -421,7 +409,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.RLock() - v, err = s.Has(ctx, "key") + v, err = s.Has("key") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -434,7 +422,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.Lock() - err = s.Delete(ctx, "key"+strconv.Itoa(i)) + err = s.Delete("key" + strconv.Itoa(i)) assert.NoError(t, err) m.Unlock() } diff --git a/plugins/kv/memory/storage.go b/plugins/kv/memory/plugin.go index f4bdacea..2c65f14c 100644 --- a/plugins/kv/memory/storage.go +++ b/plugins/kv/memory/plugin.go @@ -1,7 +1,6 @@ package memory import ( - "context" "strings" "sync" "time" @@ -49,7 +48,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return nil } -func (s Plugin) Serve() chan error { +func (s *Plugin) Serve() chan error { errCh := make(chan error, 1) // start in-memory gc for kv go s.gc() @@ -57,7 +56,7 @@ func (s Plugin) Serve() chan error { return errCh } -func (s Plugin) Stop() error { +func (s *Plugin) Stop() error { const op = errors.Op("in-memory storage stop") err := s.Close() if err != nil { @@ -66,7 +65,7 @@ func (s Plugin) Stop() error { return nil } -func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) { +func (s *Plugin) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("in-memory storage Has") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -86,7 +85,7 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error return m, nil } -func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { +func (s *Plugin) Get(key string) ([]byte, error) { const op = errors.Op("in-memory storage Get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -102,7 +101,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { return nil, nil } -func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("in-memory storage MGet") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -127,7 +126,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ return m, nil } -func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { +func (s *Plugin) Set(items ...kv.Item) error { const op = errors.Op("in-memory storage Set") if items == nil { return errors.E(op, errors.NoKeys) @@ -150,7 +149,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { +func (s *Plugin) MExpire(items ...kv.Item) error { const op = errors.Op("in-memory storage MExpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { @@ -179,7 +178,7 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { return nil } -func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { const op = errors.Op("in-memory storage TTL") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -203,7 +202,7 @@ func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{} return m, nil } -func (s Plugin) Delete(ctx context.Context, keys ...string) error { +func (s *Plugin) Delete(keys ...string) error { const op = errors.Op("in-memory storage Delete") if keys == nil { return errors.E(op, errors.NoKeys) @@ -224,11 +223,21 @@ func (s Plugin) Delete(ctx context.Context, keys ...string) error { } // Close clears the in-memory storage -func (s Plugin) Close() error { +func (s *Plugin) Close() error { s.stop <- struct{}{} return nil } +// RPCService returns associated rpc service. +func (s *Plugin) RPC() interface{} { + return kv.NewRPCServer(s, s.log) +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} + // ================================== PRIVATE ====================================== func (s *Plugin) gc() { diff --git a/plugins/kv/memory/storage_test.go b/plugins/kv/memory/storage_test.go index b7b46637..4b30460d 100644 --- a/plugins/kv/memory/storage_test.go +++ b/plugins/kv/memory/storage_test.go @@ -1,7 +1,6 @@ package memory import ( - "context" "strconv" "sync" "testing" @@ -16,7 +15,7 @@ func initStorage() kv.Storage { } func cleanup(t *testing.T, s kv.Storage, keys ...string) { - err := s.Delete(context.Background(), keys...) + err := s.Delete(keys...) if err != nil { t.Fatalf("error during cleanup: %s", err.Error()) } @@ -25,9 +24,7 @@ func cleanup(t *testing.T, s kv.Storage, keys ...string) { func TestStorage_Has(t *testing.T) { s := initStorage() - ctx := context.Background() - - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) } @@ -41,13 +38,12 @@ func TestStorage_Has_Set_Has(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: "", @@ -58,7 +54,7 @@ func TestStorage_Has_Set_Has(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -74,13 +70,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: "", @@ -91,13 +86,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } @@ -111,13 +106,12 @@ func TestStorage_Has_Set_Get(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: "", @@ -128,13 +122,13 @@ func TestStorage_Has_Set_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.Get(ctx, "key") + res, err := s.Get("key") assert.NoError(t, err) if string(res) != "value" { @@ -151,13 +145,12 @@ func TestStorage_Set_Del_Get(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: "", @@ -168,27 +161,26 @@ func TestStorage_Set_Del_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) // check that keys are present - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) - assert.NoError(t, s.Delete(ctx, "key", "key2")) - // check that keys are not presentps -eo state,uid,pid,ppid,rtprio,time,comm - res, err = s.MGet(ctx, "key", "key2") + assert.NoError(t, s.Delete("key", "key2")) + // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm + res, err = s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 0) } func TestStorage_Set_GetM(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") @@ -198,11 +190,11 @@ func TestStorage_Set_GetM(t *testing.T) { } }() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: "", @@ -213,14 +205,13 @@ func TestStorage_Set_GetM(t *testing.T) { TTL: "", })) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } func TestStorage_MExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") @@ -230,12 +221,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -258,12 +249,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { Value: "", TTL: nowPlusFive, } - assert.NoError(t, s.MExpire(ctx, i1, i2)) + assert.NoError(t, s.MExpire(i1, i2)) time.Sleep(time.Second * 6) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) @@ -271,7 +262,6 @@ func TestStorage_MExpire_TTL(t *testing.T) { func TestNilAndWrongArgs(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { if err := s.Close(); err != nil { panic(err) @@ -279,48 +269,47 @@ func TestNilAndWrongArgs(t *testing.T) { }() // check - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - _, err = s.Has(ctx, "") + _, err = s.Has("") assert.Error(t, err) - _, err = s.Get(ctx, "") + _, err = s.Get("") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", "") + _, err = s.MGet("key", "key2", "") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", " ") + _, err = s.MGet("key", "key2", " ") assert.Error(t, err) - assert.NoError(t, s.Set(ctx, kv.Item{})) - _, err = s.Has(ctx, "key") + assert.NoError(t, s.Set(kv.Item{})) + _, err = s.Has("key") assert.NoError(t, err) - err = s.Delete(ctx, "") + err = s.Delete("") assert.Error(t, err) - err = s.Delete(ctx, "key", "") + err = s.Delete("key", "") assert.Error(t, err) - err = s.Delete(ctx, "key", " ") + err = s.Delete("key", " ") assert.Error(t, err) - err = s.Delete(ctx, "key") + err = s.Delete("key") assert.NoError(t, err) } func TestStorage_SetExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") if err := s.Close(); err != nil { @@ -329,12 +318,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -348,7 +337,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) // set timeout to 5 sec - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: nowPlusFive, @@ -360,7 +349,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { })) time.Sleep(time.Second * 2) - m, err := s.TTL(ctx, "key", "key2") + m, err := s.TTL("key", "key2") assert.NoError(t, err) // remove a precision 4.02342342 -> 4 @@ -381,7 +370,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { time.Sleep(time.Second * 4) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) @@ -396,13 +385,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -412,7 +400,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -429,7 +417,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { m.Lock() // set is writable transaction // it should stop readable - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key" + strconv.Itoa(i), Value: "hello world" + strconv.Itoa(i), TTL: "", @@ -447,7 +435,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.RLock() - v, err = s.Has(ctx, "key") + v, err = s.Has("key") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -460,7 +448,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.Lock() - err = s.Delete(ctx, "key"+strconv.Itoa(i)) + err = s.Delete("key" + strconv.Itoa(i)) assert.NoError(t, err) m.Unlock() } diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go new file mode 100644 index 00000000..751f0d12 --- /dev/null +++ b/plugins/kv/rpc.go @@ -0,0 +1,110 @@ +package kv + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// Wrapper for the plugin +type RPCServer struct { + // svc is a plugin implementing Storage interface + svc Storage + // Logger + log logger.Logger +} + +// NewRPCServer construct RPC server for the particular plugin +func NewRPCServer(srv Storage, log logger.Logger) *RPCServer { + return &RPCServer{ + svc: srv, + log: log, + } +} + +// data Data +func (r *RPCServer) Has(in []string, res *map[string]bool) error { + const op = errors.Op("rpc server Has") + ret, err := r.svc.Has(in...) + if err != nil { + return errors.E(op, err) + } + + // update the value in the pointer + *res = ret + return nil +} + +// in SetData +func (r *RPCServer) Set(in []Item, ok *bool) error { + const op = errors.Op("rpc server Set") + + err := r.svc.Set(in...) + if err != nil { + return errors.E(op, err) + } + + *ok = true + return nil +} + +// in Data +func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error { + const op = errors.Op("rpc server MGet") + ret, err := r.svc.MGet(in...) + if err != nil { + return errors.E(op, err) + } + + // update return value + *res = ret + return nil +} + +// in Data +func (r *RPCServer) MExpire(in []Item, ok *bool) error { + const op = errors.Op("rpc server MExpire") + + err := r.svc.MExpire(in...) + if err != nil { + return errors.E(op, err) + } + + *ok = true + return nil +} + +// in Data +func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error { + const op = errors.Op("rpc server TTL") + + ret, err := r.svc.TTL(in...) + if err != nil { + return errors.E(op, err) + } + + *res = ret + return nil +} + +// in Data +func (r *RPCServer) Delete(in []string, ok *bool) error { + const op = errors.Op("rpc server Delete") + err := r.svc.Delete(in...) + if err != nil { + return errors.E(op, err) + } + *ok = true + return nil +} + +// in string, storages +func (r *RPCServer) Close(storage string, ok *bool) error { + const op = errors.Op("rpc server Close") + err := r.svc.Close() + if err != nil { + return errors.E(op, err) + } + *ok = true + + return nil +} Binary files differdiff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml new file mode 100644 index 00000000..fd1a48bf --- /dev/null +++ b/tests/docker-compose.yaml @@ -0,0 +1,7 @@ +version: '3' + +services: + memcached: + image: memcached:latest + ports: + - "0.0.0.0:11211:11211"
\ No newline at end of file diff --git a/tests/plugins/kv/boltdb/configs/.rr-init.yaml b/tests/plugins/kv/boltdb/configs/.rr-init.yaml new file mode 100644 index 00000000..4629a24b --- /dev/null +++ b/tests/plugins/kv/boltdb/configs/.rr-init.yaml @@ -0,0 +1,46 @@ +rpc: + listen: tcp://127.0.0.1:6001 + disabled: false + +server: + command: "php ../../../psr-worker-bench.php" + user: "" + group: "" + env: + "RR_HTTP": "true" + relay: "pipes" + relayTimeout: "20s" + +logs: + mode: development + level: debug + +http: + address: 127.0.0.1:44933 + maxRequestSize: 1024 + middleware: ["gzip", "headers"] + uploads: + forbid: [".php", ".exe", ".bat"] + trustedSubnets: + [ + "10.0.0.0/8", + "127.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "::1/128", + "fc00::/7", + "fe80::/10", + ] + pool: + numWorkers: 6 + maxJobs: 0 + allocateTimeout: 60s + destroyTimeout: 60s + +# boltdb simple driver +boltdb: + dir: "." + file: "rr" + bucket: "test" + permissions: 777 + interval: 1 # seconds diff --git a/tests/plugins/kv/boltdb/plugin_test.go b/tests/plugins/kv/boltdb/plugin_test.go new file mode 100644 index 00000000..ba9b695a --- /dev/null +++ b/tests/plugins/kv/boltdb/plugin_test.go @@ -0,0 +1,195 @@ +package boltdb_tests //nolint:golint,stylecheck + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/spiral/endure" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/boltdb" + "github.com/spiral/roadrunner/v2/plugins/logger" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/stretchr/testify/assert" +) + +func TestBoltDb(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &boltdb.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &httpPlugin.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("testBoltDbRPCMethods", testRPCMethods) + stopCh <- struct{}{} + wg.Wait() + + _ = os.Remove("rr") +} + +func testRPCMethods(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var setRes bool + items := make([]kv.Item, 0, 5) + items = append(items, kv.Item{ + Key: "a", + Value: "aa", + }) + items = append(items, kv.Item{ + Key: "b", + Value: "bb", + }) + // add 5 second ttl + tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) + items = append(items, kv.Item{ + Key: "c", + Value: "cc", + TTL: tt, + }) + + items = append(items, kv.Item{ + Key: "d", + Value: "dd", + }) + + items = append(items, kv.Item{ + Key: "e", + Value: "ee", + }) + + // Register 3 keys with values + err = client.Call("boltdb.Set", items, &setRes) + assert.NoError(t, err) + assert.True(t, setRes) + + ret := make(map[string]bool) + keys := []string{"a", "b", "c"} + err = client.Call("boltdb.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 3) // should be 3 + + // key "c" should be deleted + time.Sleep(time.Second * 7) + + ret = make(map[string]bool) + err = client.Call("boltdb.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 2) // should be 2 + + mGet := make(map[string]interface{}) + keys = []string{"a", "b", "c"} + err = client.Call("boltdb.MGet", keys, &mGet) + assert.NoError(t, err) + assert.Len(t, mGet, 2) // c is expired + assert.Equal(t, string("aa"), mGet["a"].(string)) + assert.Equal(t, string("bb"), mGet["b"].(string)) + + mExpKeys := make([]kv.Item, 0, 2) + tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) + mExpKeys = append(mExpKeys, kv.Item{Key: "a", TTL: tt2}) + mExpKeys = append(mExpKeys, kv.Item{Key: "b", TTL: tt2}) + mExpKeys = append(mExpKeys, kv.Item{Key: "d", TTL: tt2}) + + // MEXPIRE + var mExpRes bool + err = client.Call("boltdb.MExpire", mExpKeys, &mExpRes) + assert.NoError(t, err) + assert.True(t, mExpRes) + + // TTL + keys = []string{"a", "b", "d"} + ttlRes := make(map[string]interface{}) + err = client.Call("boltdb.TTL", keys, &ttlRes) + assert.NoError(t, err) + assert.Len(t, ttlRes, 3) + + // HAS AFTER TTL + time.Sleep(time.Second * 11) + ret = make(map[string]bool) + keys = []string{"a", "b", "d"} + err = client.Call("boltdb.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) + + // DELETE + keys = []string{"e"} + var delRet bool + err = client.Call("boltdb.Delete", keys, &delRet) + assert.NoError(t, err) + assert.True(t, delRet) + + // HAS AFTER DELETE + ret = make(map[string]bool) + keys = []string{"e"} + err = client.Call("boltdb.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) +} diff --git a/tests/plugins/kv/memcached/configs/.rr-init.yaml b/tests/plugins/kv/memcached/configs/.rr-init.yaml new file mode 100644 index 00000000..759fc3ba --- /dev/null +++ b/tests/plugins/kv/memcached/configs/.rr-init.yaml @@ -0,0 +1,43 @@ +rpc: + listen: tcp://127.0.0.1:6001 + disabled: false + +server: + command: "php ../../../psr-worker-bench.php" + user: "" + group: "" + env: + "RR_HTTP": "true" + relay: "pipes" + relayTimeout: "20s" + +logs: + mode: development + level: debug + +http: + address: 127.0.0.1:44933 + maxRequestSize: 1024 + middleware: ["gzip", "headers"] + uploads: + forbid: [".php", ".exe", ".bat"] + trustedSubnets: + [ + "10.0.0.0/8", + "127.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "::1/128", + "fc00::/7", + "fe80::/10", + ] + pool: + numWorkers: 6 + maxJobs: 0 + allocateTimeout: 60s + destroyTimeout: 60s + +# boltdb simple driver +memcached: + addr: + - "localhost:11211"
\ No newline at end of file diff --git a/tests/plugins/kv/memcached/plugin_test.go b/tests/plugins/kv/memcached/plugin_test.go new file mode 100644 index 00000000..d30de3b6 --- /dev/null +++ b/tests/plugins/kv/memcached/plugin_test.go @@ -0,0 +1,195 @@ +package memcached_test + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/spiral/endure" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/memcached" + "github.com/spiral/roadrunner/v2/plugins/logger" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/stretchr/testify/assert" +) + +func TestMemcache(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &memcached.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &httpPlugin.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("testMemcachedRPCMethods", testRPCMethods) + stopCh <- struct{}{} + wg.Wait() + + _ = os.Remove("rr") +} + +func testRPCMethods(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var setRes bool + items := make([]kv.Item, 0, 5) + items = append(items, kv.Item{ + Key: "a", + Value: "aa", + }) + items = append(items, kv.Item{ + Key: "b", + Value: "bb", + }) + // add 5 second ttl + tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) + items = append(items, kv.Item{ + Key: "c", + Value: "cc", + TTL: tt, + }) + + items = append(items, kv.Item{ + Key: "d", + Value: "dd", + }) + + items = append(items, kv.Item{ + Key: "e", + Value: "ee", + }) + + // Register 3 keys with values + err = client.Call("memcached.Set", items, &setRes) + assert.NoError(t, err) + assert.True(t, setRes) + + ret := make(map[string]bool) + keys := []string{"a", "b", "c"} + err = client.Call("memcached.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 3) // should be 3 + + // key "c" should be deleted + time.Sleep(time.Second * 7) + + ret = make(map[string]bool) + err = client.Call("memcached.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 2) // should be 2 + + mGet := make(map[string]interface{}) + keys = []string{"a", "b", "c"} + err = client.Call("memcached.MGet", keys, &mGet) + assert.NoError(t, err) + assert.Len(t, mGet, 2) // c is expired + assert.Equal(t, string("aa"), mGet["a"].(string)) + assert.Equal(t, string("bb"), mGet["b"].(string)) + + mExpKeys := make([]kv.Item, 0, 2) + tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) + mExpKeys = append(mExpKeys, kv.Item{Key: "a", TTL: tt2}) + mExpKeys = append(mExpKeys, kv.Item{Key: "b", TTL: tt2}) + mExpKeys = append(mExpKeys, kv.Item{Key: "d", TTL: tt2}) + + // MEXPIRE + var mExpRes bool + err = client.Call("memcached.MExpire", mExpKeys, &mExpRes) + assert.NoError(t, err) + assert.True(t, mExpRes) + + // TTL + keys = []string{"a", "b", "d"} + ttlRes := make(map[string]interface{}) + err = client.Call("memcached.TTL", keys, &ttlRes) + assert.NoError(t, err) + assert.Len(t, ttlRes, 3) + + // HAS AFTER TTL + time.Sleep(time.Second * 11) + ret = make(map[string]bool) + keys = []string{"a", "b", "d"} + err = client.Call("memcached.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) + + // DELETE + keys = []string{"e"} + var delRet bool + err = client.Call("memcached.Delete", keys, &delRet) + assert.NoError(t, err) + assert.True(t, delRet) + + // HAS AFTER DELETE + ret = make(map[string]bool) + keys = []string{"e"} + err = client.Call("memcached.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) +} diff --git a/tests/plugins/kv/memory/configs/.rr-init.yaml b/tests/plugins/kv/memory/configs/.rr-init.yaml new file mode 100644 index 00000000..3dceea95 --- /dev/null +++ b/tests/plugins/kv/memory/configs/.rr-init.yaml @@ -0,0 +1,42 @@ +rpc: + listen: tcp://127.0.0.1:6001 + disabled: false + +server: + command: "php ../../../psr-worker-bench.php" + user: "" + group: "" + env: + "RR_HTTP": "true" + relay: "pipes" + relayTimeout: "20s" + +logs: + mode: development + level: debug + +http: + address: 127.0.0.1:44933 + maxRequestSize: 1024 + middleware: ["gzip", "headers"] + uploads: + forbid: [".php", ".exe", ".bat"] + trustedSubnets: + [ + "10.0.0.0/8", + "127.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "::1/128", + "fc00::/7", + "fe80::/10", + ] + pool: + numWorkers: 6 + maxJobs: 0 + allocateTimeout: 60s + destroyTimeout: 60s + +# in memory KV driver +memory: + enabled: true
\ No newline at end of file diff --git a/tests/plugins/kv/memory/plugin_test.go b/tests/plugins/kv/memory/plugin_test.go new file mode 100644 index 00000000..c6f94602 --- /dev/null +++ b/tests/plugins/kv/memory/plugin_test.go @@ -0,0 +1,195 @@ +package memory_test + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/spiral/endure" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/memory" + "github.com/spiral/roadrunner/v2/plugins/logger" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/stretchr/testify/assert" +) + +func TestInMemory(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &memory.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &httpPlugin.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("testInMemoryRPCMethods", testRPCMethods) + stopCh <- struct{}{} + wg.Wait() + + _ = os.Remove("rr") +} + +func testRPCMethods(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var setRes bool + items := make([]kv.Item, 0, 5) + items = append(items, kv.Item{ + Key: "a", + Value: "aa", + }) + items = append(items, kv.Item{ + Key: "b", + Value: "bb", + }) + // add 5 second ttl + tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) + items = append(items, kv.Item{ + Key: "c", + Value: "cc", + TTL: tt, + }) + + items = append(items, kv.Item{ + Key: "d", + Value: "dd", + }) + + items = append(items, kv.Item{ + Key: "e", + Value: "ee", + }) + + // Register 3 keys with values + err = client.Call("memory.Set", items, &setRes) + assert.NoError(t, err) + assert.True(t, setRes) + + ret := make(map[string]bool) + keys := []string{"a", "b", "c"} + err = client.Call("memory.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 3) // should be 3 + + // key "c" should be deleted + time.Sleep(time.Second * 7) + + ret = make(map[string]bool) + err = client.Call("memory.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 2) // should be 2 + + mGet := make(map[string]interface{}) + keys = []string{"a", "b", "c"} + err = client.Call("memory.MGet", keys, &mGet) + assert.NoError(t, err) + assert.Len(t, mGet, 2) // c is expired + assert.Equal(t, string("aa"), mGet["a"].(string)) + assert.Equal(t, string("bb"), mGet["b"].(string)) + + mExpKeys := make([]kv.Item, 0, 2) + tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) + mExpKeys = append(mExpKeys, kv.Item{Key: "a", TTL: tt2}) + mExpKeys = append(mExpKeys, kv.Item{Key: "b", TTL: tt2}) + mExpKeys = append(mExpKeys, kv.Item{Key: "d", TTL: tt2}) + + // MEXPIRE + var mExpRes bool + err = client.Call("memory.MExpire", mExpKeys, &mExpRes) + assert.NoError(t, err) + assert.True(t, mExpRes) + + // TTL + keys = []string{"a", "b", "d"} + ttlRes := make(map[string]interface{}) + err = client.Call("memory.TTL", keys, &ttlRes) + assert.NoError(t, err) + assert.Len(t, ttlRes, 3) + + // HAS AFTER TTL + time.Sleep(time.Second * 11) + ret = make(map[string]bool) + keys = []string{"a", "b", "d"} + err = client.Call("memory.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) + + // DELETE + keys = []string{"e"} + var delRet bool + err = client.Call("memory.Delete", keys, &delRet) + assert.NoError(t, err) + assert.True(t, delRet) + + // HAS AFTER DELETE + ret = make(map[string]bool) + keys = []string{"e"} + err = client.Call("memory.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) +} |