summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-07 01:06:50 +0300
committerValery Piashchynski <[email protected]>2021-01-07 01:06:50 +0300
commitc1465d3bcdf24a78440300aa51e7cfc92ce874a8 (patch)
tree6e0f5107eba90df73724b6611ca6adfa148d2a3f
parentc9f670ee734355cbc5d504186946b7db67cf62b5 (diff)
KV, updated, bug fixed, with intergration tests via plugins
-rw-r--r--.github/workflows/build.yml2
-rw-r--r--.vscode/launch.json16
-rw-r--r--.vscode/settings.json1
-rwxr-xr-xMakefile8
-rwxr-xr-xgo.sum21
-rw-r--r--plugins/kv/boltdb/config.go11
-rw-r--r--plugins/kv/boltdb/plugin.go148
-rw-r--r--plugins/kv/boltdb/plugin_unit_test.go172
-rw-r--r--plugins/kv/interface.go20
-rw-r--r--plugins/kv/memcached/plugin.go25
-rw-r--r--plugins/kv/memcached/storage_test.go108
-rw-r--r--plugins/kv/memory/plugin.go (renamed from plugins/kv/memory/storage.go)31
-rw-r--r--plugins/kv/memory/storage_test.go114
-rw-r--r--plugins/kv/rpc.go110
-rwxr-xr-xrrbin0 -> 18108416 bytes
-rw-r--r--tests/docker-compose.yaml7
-rw-r--r--tests/plugins/kv/boltdb/configs/.rr-init.yaml46
-rw-r--r--tests/plugins/kv/boltdb/plugin_test.go195
-rw-r--r--tests/plugins/kv/memcached/configs/.rr-init.yaml43
-rw-r--r--tests/plugins/kv/memcached/plugin_test.go195
-rw-r--r--tests/plugins/kv/memory/configs/.rr-init.yaml42
-rw-r--r--tests/plugins/kv/memory/plugin_test.go195
22 files changed, 1194 insertions, 316 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index b0cd285d..4c57125d 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -65,6 +65,7 @@ jobs:
- name: Run golang tests on Windows without codecov
if: ${{ matrix.os == 'windows-latest' }}
run: |
+ docker-compose -f ./tests/docker-compose.yaml up -d
go test -v -race -cover -tags=debug ./utils
go test -v -race -cover -tags=debug ./pkg/pipe
go test -v -race -cover -tags=debug ./pkg/pool
@@ -88,6 +89,7 @@ jobs:
- name: Run golang tests on Linux and MacOS
if: ${{ matrix.os != 'windows-latest' }}
run: |
+ docker-compose -f ./tests/docker-compose.yaml up -d
mkdir ./coverage-ci
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/utils.txt -covermode=atomic ./utils
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe
diff --git a/.vscode/launch.json b/.vscode/launch.json
index 3cf21fed..f43ef860 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -5,6 +5,13 @@
"version": "0.2.0",
"configurations": [
{
+ "name": "Launch test file",
+ "type": "go",
+ "request": "launch",
+ "mode": "test",
+ "program": "${file}"
+ },
+ {
"name": "Launch main debug, race",
"type": "go",
"request": "launch",
@@ -13,13 +20,6 @@
"buildFlags": "-race",
"args": ["serve", "-c", "../.rr.yaml"],
"program": "${workspaceFolder}/cmd/main.go"
- },
- {
- "name": "Launch file",
- "type": "go",
- "request": "launch",
- "mode": "debug",
- "program": "${file}"
- },
+ }
]
} \ No newline at end of file
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 70a50b98..78560788 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -8,6 +8,7 @@
"cSpell.words": [
"asdf",
"bbolt",
+ "gofiber",
"stopc",
"treshholdc"
]
diff --git a/Makefile b/Makefile
index 30eb2942..9b9a86aa 100755
--- a/Makefile
+++ b/Makefile
@@ -25,6 +25,7 @@ uninstall: ## Uninstall locally installed RR
test: ## Run application tests
#go clean -testcache
+ docker-compose -f docker-compose.yaml up -d
go test -v -race -cover -tags=debug -covermode=atomic ./utils
go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pipe
go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pool
@@ -46,9 +47,16 @@ test: ## Run application tests
go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/static
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/boltdb
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/memory
+ docker-compose down
lint: ## Run application linters
golangci-lint run
kv:
+ docker-compose -f tests/docker-compose.yaml up -d
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/boltdb
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/memory
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/memcached
+ go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/kv/boltdb
+ go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/kv/memory
+ go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/kv/memcached
+ docker-compose down
diff --git a/go.sum b/go.sum
index 38deb795..c4f1f187 100755
--- a/go.sum
+++ b/go.sum
@@ -60,6 +60,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/containerd/containerd v1.4.3 h1:ijQT13JedHSHrQGWFcGEwzcNKrAGIiZ+jSD5QQG07SY=
+github.com/containerd/containerd v1.4.3/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
@@ -75,6 +77,15 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
+github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug=
+github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
+github.com/docker/docker v1.13.1 h1:IkZjBSIc8hBjLpqeAbeE5mca5mNgeatLHBy3GO78BWo=
+github.com/docker/docker v20.10.2+incompatible h1:vFgEHPqWBTp4pTjdLwjAA4bSo3gvIGOYwuJTlEjVBCw=
+github.com/docker/docker v20.10.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
+github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
+github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
+github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
+github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
@@ -103,6 +114,7 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/gofiber/fiber/v2 v2.3.0 h1:82ufvLne0cxzdkDOeLkUmteA+z1uve9JQ/ZFsMOnkzc=
github.com/gofiber/fiber/v2 v2.3.0/go.mod h1:f8BRRIMjMdRyt2qmJ/0Sea3j3rwwfufPrh9WNBRiVZ0=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
+github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -244,6 +256,10 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.4 h1:NiTx7EEvBzu9sFOD1zORteLSt3o8gnlvZZwSE9TnY9U=
github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ=
+github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
+github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
+github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI=
+github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
@@ -290,6 +306,7 @@ github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMT
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
@@ -328,6 +345,8 @@ github.com/spiral/errors v1.0.6 h1:berk5ShEILSw6DplUVv9Ea1wGdk2WlVKQpuvDngll0U=
github.com/spiral/errors v1.0.6/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.7 h1:GRN7Sjk4yVavD2W+1fUWBjqoivWQsnbsXbX7xyhZhbU=
github.com/spiral/errors v1.0.7/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
+github.com/spiral/goridge v1.0.4 h1:qnYtI84H0tcYjcbFdFl/VUFQZ0YUE9p+VuU8In4kC/8=
+github.com/spiral/goridge v2.1.4+incompatible h1:L15TKrbPEp/G6JfS3jjuvY6whkhfD292XX+1iy9mO2k=
github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc=
github.com/spiral/goridge/v3 v3.0.0-beta8 h1:x8uXCdhY49U1LEvmehnTaD2El6J9ZHAefRdh/QIZ6A4=
github.com/spiral/goridge/v3 v3.0.0-beta8/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE=
@@ -530,10 +549,12 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
+google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a h1:Ob5/580gVHBJZgXnff1cZDbG+xLtMVE5mDRTe+nIsX4=
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
+google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
diff --git a/plugins/kv/boltdb/config.go b/plugins/kv/boltdb/config.go
index 6b116611..b2e1e636 100644
--- a/plugins/kv/boltdb/config.go
+++ b/plugins/kv/boltdb/config.go
@@ -8,16 +8,17 @@ type Config struct {
File string
// Bucket to store data in boltDB
Bucket string
-
+ // db file permissions
Permissions int
- TTL int
+ // timeout
+ Interval uint `yaml:"interval"`
}
-func (s *Config) InitDefaults() error {
+// InitDefaults initializes default values for the boltdb
+func (s *Config) InitDefaults() {
s.Dir = "." // current dir
s.Bucket = "rr" // default bucket name
s.File = "rr.db" // default file name
s.Permissions = 0777 // free for all
- s.TTL = 60 // 60 seconds is default TTL
- return nil
+ s.Interval = 60 // default is 60 seconds timeout
}
diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/boltdb/plugin.go
index e5eda0c2..6cfc49f6 100644
--- a/plugins/kv/boltdb/plugin.go
+++ b/plugins/kv/boltdb/plugin.go
@@ -2,7 +2,6 @@ package boltdb
import (
"bytes"
- "context"
"encoding/gob"
"os"
"path"
@@ -41,78 +40,23 @@ type Plugin struct {
stop chan struct{}
}
-// NewBoltClient instantiate new BOLTDB client
-// The parameters are:
-// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created
-// perm os.FileMode -- file permissions, for example 0777
-// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other
-// bucket string -- name of the bucket to use, should be UTF-8
-func NewBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) {
- const op = errors.Op("newBoltClient")
- db, err := bolt.Open(path, perm, options)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // bucket should be SET
- if bucket == "" {
- return nil, errors.E(op, errors.Str("bucket should be set"))
- }
-
- // create bucket if it does not exist
- // tx.Commit invokes via the db.Update
- err = db.Update(func(tx *bolt.Tx) error {
- _, err = tx.CreateBucketIfNotExists([]byte(bucket))
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- })
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // if TTL is not set, make it default
- if ttl == 0 {
- ttl = time.Minute
- }
-
- s := &Plugin{
- DB: db,
- bucket: []byte(bucket),
- stop: make(chan struct{}),
- timeout: ttl,
- gc: &sync.Map{},
- }
-
- // start the TTL gc
- go s.gcPhase()
-
- return s, nil
-}
-
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
const op = errors.Op("boltdb plugin init")
s.cfg = &Config{}
+ s.cfg.InitDefaults()
+
err := cfg.UnmarshalKey(PluginName, &s.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
+ // set the logger
s.log = log
- return nil
-}
-
-func (s *Plugin) Serve() chan error {
- const op = errors.Op("boltdb serve")
- errCh := make(chan error, 1)
-
db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil)
if err != nil {
- errCh <- errors.E(op, err)
- return errCh
+ return errors.E(op, err)
}
// create bucket if it does not exist
@@ -125,24 +69,29 @@ func (s *Plugin) Serve() chan error {
}
return nil
})
+
if err != nil {
- errCh <- err
- return errCh
+ return errors.E(op, err)
}
s.DB = db
s.bucket = []byte(s.cfg.Bucket)
s.stop = make(chan struct{})
- s.timeout = time.Duration(s.cfg.TTL) * time.Second
+ s.timeout = time.Duration(s.cfg.Interval) * time.Second
s.gc = &sync.Map{}
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
// start the TTL gc
go s.gcPhase()
return errCh
}
-func (s Plugin) Stop() error {
+func (s *Plugin) Stop() error {
const op = errors.Op("boltdb stop")
err := s.Close()
if err != nil {
@@ -151,8 +100,9 @@ func (s Plugin) Stop() error {
return nil
}
-func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("boltdb Has")
+ s.log.Debug("boltdb HAS method called", "args", keys)
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
}
@@ -164,8 +114,8 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
if keyTrimmed == "" {
return errors.E(op, errors.EmptyKey)
}
@@ -173,24 +123,25 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
- exist := b.Get([]byte(key))
+ exist := b.Get([]byte(keys[i]))
if exist != nil {
- m[key] = true
+ m[keys[i]] = true
}
}
return nil
})
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
+ s.log.Debug("boltdb HAS method finished")
return m, nil
}
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
-func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+func (s *Plugin) Get(key string) ([]byte, error) {
const op = errors.Op("boltdb Get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -211,7 +162,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
buf := bytes.NewReader(val)
decoder := gob.NewDecoder(buf)
- i := kv.Item{}
+ var i string
err := decoder.Decode(&i)
if err != nil {
// unsafe (w/o runes) convert
@@ -219,7 +170,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
}
// set the value
- val = []byte(i.Value)
+ val = []byte(i)
}
return nil
})
@@ -230,7 +181,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
return val, nil
}
-func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("boltdb MGet")
// defence
if keys == nil {
@@ -238,8 +189,8 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
}
// should not be empty keys
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
if keyTrimmed == "" {
return nil, errors.E(op, errors.EmptyKey)
}
@@ -253,10 +204,22 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
return errors.E(op, errors.NoSuchBucket)
}
- for _, key := range keys {
- value := b.Get([]byte(key))
+ buf := new(bytes.Buffer)
+ var out string
+ buf.Grow(100)
+ for i := range keys {
+ value := b.Get([]byte(keys[i]))
+ buf.Write(value)
+ // allocate enough space
+ dec := gob.NewDecoder(buf)
if value != nil {
- m[key] = value
+ err := dec.Decode(&out)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ m[keys[i]] = out
+ buf.Reset()
+ out = ""
}
}
@@ -270,7 +233,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
}
// Set puts the K/V to the bolt
-func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+func (s *Plugin) Set(items ...kv.Item) error {
const op = errors.Op("boltdb Set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -303,7 +266,8 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
return errors.E(op, errors.EmptyItem)
}
- err = encoder.Encode(&items[i])
+ // Encode value
+ err = encoder.Encode(&items[i].Value)
if err != nil {
return errors.E(op, err)
}
@@ -321,6 +285,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
if err != nil {
return errors.E(op, err)
}
+ // Store key TTL in the separate map
s.gc.Store(items[i].Key, items[i].TTL)
}
@@ -331,7 +296,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
}
// Delete all keys from DB
-func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+func (s *Plugin) Delete(keys ...string) error {
const op = errors.Op("boltdb Delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -378,7 +343,7 @@ func (s Plugin) Delete(ctx context.Context, keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+func (s *Plugin) MExpire(items ...kv.Item) error {
const op = errors.Op("boltdb MExpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
@@ -396,7 +361,7 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
return nil
}
-func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("boltdb TTL")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -422,15 +387,25 @@ func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}
}
// Close the DB connection
-func (s Plugin) Close() error {
+func (s *Plugin) Close() error {
// stop the keys GC
s.stop <- struct{}{}
return s.DB.Close()
}
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
// ========================= PRIVATE =================================
-func (s Plugin) gcPhase() {
+func (s *Plugin) gcPhase() {
t := time.NewTicker(s.timeout)
defer t.Stop()
for {
@@ -449,6 +424,7 @@ func (s Plugin) gcPhase() {
if now.After(v) {
// time expired
s.gc.Delete(k)
+ s.log.Debug("key deleted", "key", k)
err := s.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
if b == nil {
diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/boltdb/plugin_unit_test.go
index a12c830d..2459e493 100644
--- a/plugins/kv/boltdb/plugin_unit_test.go
+++ b/plugins/kv/boltdb/plugin_unit_test.go
@@ -1,19 +1,74 @@
package boltdb
import (
- "context"
"os"
"strconv"
"sync"
"testing"
"time"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/stretchr/testify/assert"
+ bolt "go.etcd.io/bbolt"
+ "go.uber.org/zap"
)
+// NewBoltClient instantiate new BOLTDB client
+// The parameters are:
+// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created
+// perm os.FileMode -- file permissions, for example 0777
+// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other
+// bucket string -- name of the bucket to use, should be UTF-8
+func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) {
+ const op = errors.Op("newBoltClient")
+ db, err := bolt.Open(path, perm, options)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // bucket should be SET
+ if bucket == "" {
+ return nil, errors.E(op, errors.Str("bucket should be set"))
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ _, err = tx.CreateBucketIfNotExists([]byte(bucket))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // if TTL is not set, make it default
+ if ttl == 0 {
+ ttl = time.Minute
+ }
+
+ l, _ := zap.NewDevelopment()
+ s := &Plugin{
+ DB: db,
+ bucket: []byte(bucket),
+ stop: make(chan struct{}),
+ timeout: ttl,
+ gc: &sync.Map{},
+ log: logger.NewZapAdapter(l),
+ }
+
+ // start the TTL gc
+ go s.gcPhase()
+
+ return s, nil
+}
+
func initStorage() kv.Storage {
- storage, err := NewBoltClient("rr.db", 0777, nil, "rr", time.Second)
+ storage, err := newBoltClient("rr.db", 0777, nil, "rr", time.Second)
if err != nil {
panic(err)
}
@@ -36,7 +91,7 @@ func TestStorage_Has(t *testing.T) {
cleanup(t, "rr.db")
}()
- v, err := s.Has(context.Background(), "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
}
@@ -50,13 +105,12 @@ func TestStorage_Has_Set_Has(t *testing.T) {
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -66,7 +120,7 @@ func TestStorage_Has_Set_Has(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -82,13 +136,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -98,7 +151,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -115,7 +168,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
m.Lock()
// set is writable transaction
// it should stop readable
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key" + strconv.Itoa(i),
Value: "hello world" + strconv.Itoa(i),
TTL: "",
@@ -133,7 +186,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.RLock()
- v, err = s.Has(ctx, "key")
+ v, err = s.Has("key")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -146,7 +199,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.Lock()
- err = s.Delete(ctx, "key"+strconv.Itoa(i))
+ err = s.Delete("key" + strconv.Itoa(i))
assert.NoError(t, err)
m.Unlock()
}
@@ -164,13 +217,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -180,13 +232,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
@@ -200,13 +252,12 @@ func TestStorage_Has_Set_Get(t *testing.T) {
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -216,13 +267,13 @@ func TestStorage_Has_Set_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
- // no such key
+
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.Get(ctx, "key")
+ res, err := s.Get("key")
assert.NoError(t, err)
if string(res) != "hello world" {
@@ -239,13 +290,12 @@ func TestStorage_Set_Del_Get(t *testing.T) {
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -255,20 +305,20 @@ func TestStorage_Set_Del_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
// check that keys are present
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
- assert.NoError(t, s.Delete(ctx, "key", "key2"))
+ assert.NoError(t, s.Delete("key", "key2"))
// check that keys are not present
- res, err = s.MGet(ctx, "key", "key2")
+ res, err = s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 0)
}
@@ -281,13 +331,12 @@ func TestStorage_Set_GetM(t *testing.T) {
}
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -297,14 +346,13 @@ func TestStorage_Set_GetM(t *testing.T) {
TTL: "",
}))
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
func TestNilAndWrongArgs(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
if err := s.Close(); err != nil {
panic(err)
@@ -313,61 +361,60 @@ func TestNilAndWrongArgs(t *testing.T) {
}()
// check
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- _, err = s.Has(ctx, "")
+ _, err = s.Has("")
assert.Error(t, err)
- _, err = s.Get(ctx, "")
+ _, err = s.Get("")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", "")
+ _, err = s.MGet("key", "key2", "")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", " ")
+ _, err = s.MGet("key", "key2", " ")
assert.Error(t, err)
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
}))
- assert.Error(t, s.Set(ctx, kv.Item{
+ assert.Error(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "asdf",
}))
- _, err = s.Has(ctx, "key")
+ _, err = s.Has("key")
assert.NoError(t, err)
- assert.Error(t, s.Set(ctx, kv.Item{}))
+ assert.Error(t, s.Set(kv.Item{}))
- err = s.Delete(ctx, "")
+ err = s.Delete("")
assert.Error(t, err)
- err = s.Delete(ctx, "key", "")
+ err = s.Delete("key", "")
assert.Error(t, err)
- err = s.Delete(ctx, "key", " ")
+ err = s.Delete("key", " ")
assert.Error(t, err)
- err = s.Delete(ctx, "key")
+ err = s.Delete("key")
assert.NoError(t, err)
}
func TestStorage_MExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
if err := s.Close(); err != nil {
panic(err)
@@ -376,12 +423,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -404,12 +451,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
Value: "",
TTL: nowPlusFive,
}
- assert.NoError(t, s.MExpire(ctx, i1, i2))
+ assert.NoError(t, s.MExpire(i1, i2))
time.Sleep(time.Second * 6)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
@@ -417,7 +464,6 @@ func TestStorage_MExpire_TTL(t *testing.T) {
func TestStorage_SetExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
if err := s.Close(); err != nil {
panic(err)
@@ -426,12 +472,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -445,7 +491,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
// set timeout to 5 sec
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: nowPlusFive,
@@ -457,7 +503,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
}))
time.Sleep(time.Second * 2)
- m, err := s.TTL(ctx, "key", "key2")
+ m, err := s.TTL("key", "key2")
assert.NoError(t, err)
// remove a precision 4.02342342 -> 4
@@ -478,7 +524,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
time.Sleep(time.Second * 4)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 3512fd73..c1367cdf 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,10 +1,6 @@
package kv
// Item represents general storage item
-import (
- "context"
-)
-
type Item struct {
// Key of item
Key string
@@ -17,28 +13,28 @@ type Item struct {
// Storage represents single abstract storage.
type Storage interface {
// Has checks if value exists.
- Has(ctx context.Context, keys ...string) (map[string]bool, error)
+ Has(keys ...string) (map[string]bool, error)
// Get loads value content into a byte slice.
- Get(ctx context.Context, key string) ([]byte, error)
+ Get(key string) ([]byte, error)
// MGet loads content of multiple values
- // If there are no values for keys, key will no be in the map
- MGet(ctx context.Context, keys ...string) (map[string]interface{}, error)
+ // Returns the map with existing keys and associated values
+ MGet(keys ...string) (map[string]interface{}, error)
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
- Set(ctx context.Context, items ...Item) error
+ Set(items ...Item) error
// MExpire sets the TTL for multiply keys
- MExpire(ctx context.Context, items ...Item) error
+ MExpire(items ...Item) error
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
- TTL(ctx context.Context, keys ...string) (map[string]interface{}, error)
+ TTL(keys ...string) (map[string]interface{}, error)
// Delete one or multiple keys.
- Delete(ctx context.Context, keys ...string) error
+ Delete(keys ...string) error
// Close closes the storage and underlying resources.
Close() error
diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go
index 69f96bfe..f5111c04 100644
--- a/plugins/kv/memcached/plugin.go
+++ b/plugins/kv/memcached/plugin.go
@@ -1,7 +1,6 @@
package memcached
import (
- "context"
"strings"
"time"
@@ -58,8 +57,18 @@ func (s *Plugin) Stop() error {
return nil
}
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
// Has checks the key for existence
-func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+func (s Plugin) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("memcached Has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -84,7 +93,7 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error
// Get gets the item for the given key. ErrCacheMiss is returned for a
// memcache cache miss. The key must be at most 250 bytes in length.
-func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+func (s Plugin) Get(key string) ([]byte, error) {
const op = errors.Op("memcached Get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -106,7 +115,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
// return map with key -- string
// and map value as value -- []byte
-func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s Plugin) MGet(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("memcached MGet")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -141,7 +150,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+func (s Plugin) Set(items ...kv.Item) error {
const op = errors.Op("memcached Set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -182,7 +191,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+func (s Plugin) MExpire(items ...kv.Item) error {
const op = errors.Op("memcached MExpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
@@ -209,12 +218,12 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
}
// return time in seconds (int32) for a given keys
-func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s Plugin) TTL(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("memcached HTTLas")
return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
}
-func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+func (s Plugin) Delete(keys ...string) error {
const op = errors.Op("memcached Has")
if keys == nil {
return errors.E(op, errors.NoKeys)
diff --git a/plugins/kv/memcached/storage_test.go b/plugins/kv/memcached/storage_test.go
index 4b59bbd0..3d37748b 100644
--- a/plugins/kv/memcached/storage_test.go
+++ b/plugins/kv/memcached/storage_test.go
@@ -1,7 +1,6 @@
package memcached
import (
- "context"
"strconv"
"sync"
"testing"
@@ -16,7 +15,7 @@ func initStorage() kv.Storage {
}
func cleanup(t *testing.T, s kv.Storage, keys ...string) {
- err := s.Delete(context.Background(), keys...)
+ err := s.Delete(keys...)
if err != nil {
t.Fatalf("error during cleanup: %s", err.Error())
}
@@ -25,9 +24,7 @@ func cleanup(t *testing.T, s kv.Storage, keys ...string) {
func TestStorage_Has(t *testing.T) {
s := initStorage()
- ctx := context.Background()
-
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
}
@@ -41,13 +38,12 @@ func TestStorage_Has_Set_Has(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -57,7 +53,7 @@ func TestStorage_Has_Set_Has(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -73,13 +69,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -89,13 +84,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
@@ -109,13 +104,12 @@ func TestStorage_Has_Set_Get(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -125,13 +119,13 @@ func TestStorage_Has_Set_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.Get(ctx, "key")
+ res, err := s.Get("key")
assert.NoError(t, err)
if string(res) != "hello world" {
@@ -148,13 +142,12 @@ func TestStorage_Set_Del_Get(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -164,27 +157,26 @@ func TestStorage_Set_Del_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
// check that keys are present
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
- assert.NoError(t, s.Delete(ctx, "key", "key2"))
+ assert.NoError(t, s.Delete("key", "key2"))
// check that keys are not present
- res, err = s.MGet(ctx, "key", "key2")
+ res, err = s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 0)
}
func TestStorage_Set_GetM(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
@@ -194,11 +186,11 @@ func TestStorage_Set_GetM(t *testing.T) {
}
}()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -208,14 +200,13 @@ func TestStorage_Set_GetM(t *testing.T) {
TTL: "",
}))
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
func TestStorage_MExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
if err := s.Close(); err != nil {
@@ -224,12 +215,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -252,12 +243,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
Value: "",
TTL: nowPlusFive,
}
- assert.NoError(t, s.MExpire(ctx, i1, i2))
+ assert.NoError(t, s.MExpire(i1, i2))
time.Sleep(time.Second * 6)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
@@ -265,7 +256,6 @@ func TestStorage_MExpire_TTL(t *testing.T) {
func TestNilAndWrongArgs(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key")
if err := s.Close(); err != nil {
@@ -274,46 +264,45 @@ func TestNilAndWrongArgs(t *testing.T) {
}()
// check
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- _, err = s.Has(ctx, "")
+ _, err = s.Has("")
assert.Error(t, err)
- _, err = s.Get(ctx, "")
+ _, err = s.Get("")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", "")
+ _, err = s.MGet("key", "key2", "")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", " ")
+ _, err = s.MGet("key", "key2", " ")
assert.Error(t, err)
- assert.Error(t, s.Set(ctx, kv.Item{}))
+ assert.Error(t, s.Set(kv.Item{}))
- err = s.Delete(ctx, "")
+ err = s.Delete("")
assert.Error(t, err)
- err = s.Delete(ctx, "key", "")
+ err = s.Delete("key", "")
assert.Error(t, err)
- err = s.Delete(ctx, "key", " ")
+ err = s.Delete("key", " ")
assert.Error(t, err)
- err = s.Delete(ctx, "key")
+ err = s.Delete("key")
assert.NoError(t, err)
}
func TestStorage_SetExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
if err := s.Close(); err != nil {
@@ -322,12 +311,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -341,7 +330,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
// set timeout to 5 sec
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: nowPlusFive,
@@ -355,7 +344,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
time.Sleep(time.Second * 6)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
@@ -370,13 +359,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -386,7 +374,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -403,7 +391,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
m.Lock()
// set is writable transaction
// it should stop readable
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key" + strconv.Itoa(i),
Value: "hello world" + strconv.Itoa(i),
TTL: "",
@@ -421,7 +409,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.RLock()
- v, err = s.Has(ctx, "key")
+ v, err = s.Has("key")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -434,7 +422,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.Lock()
- err = s.Delete(ctx, "key"+strconv.Itoa(i))
+ err = s.Delete("key" + strconv.Itoa(i))
assert.NoError(t, err)
m.Unlock()
}
diff --git a/plugins/kv/memory/storage.go b/plugins/kv/memory/plugin.go
index f4bdacea..2c65f14c 100644
--- a/plugins/kv/memory/storage.go
+++ b/plugins/kv/memory/plugin.go
@@ -1,7 +1,6 @@
package memory
import (
- "context"
"strings"
"sync"
"time"
@@ -49,7 +48,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (s Plugin) Serve() chan error {
+func (s *Plugin) Serve() chan error {
errCh := make(chan error, 1)
// start in-memory gc for kv
go s.gc()
@@ -57,7 +56,7 @@ func (s Plugin) Serve() chan error {
return errCh
}
-func (s Plugin) Stop() error {
+func (s *Plugin) Stop() error {
const op = errors.Op("in-memory storage stop")
err := s.Close()
if err != nil {
@@ -66,7 +65,7 @@ func (s Plugin) Stop() error {
return nil
}
-func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("in-memory storage Has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -86,7 +85,7 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error
return m, nil
}
-func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+func (s *Plugin) Get(key string) ([]byte, error) {
const op = errors.Op("in-memory storage Get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -102,7 +101,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
return nil, nil
}
-func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("in-memory storage MGet")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -127,7 +126,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
return m, nil
}
-func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+func (s *Plugin) Set(items ...kv.Item) error {
const op = errors.Op("in-memory storage Set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -150,7 +149,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+func (s *Plugin) MExpire(items ...kv.Item) error {
const op = errors.Op("in-memory storage MExpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
@@ -179,7 +178,7 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
return nil
}
-func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("in-memory storage TTL")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -203,7 +202,7 @@ func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}
return m, nil
}
-func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+func (s *Plugin) Delete(keys ...string) error {
const op = errors.Op("in-memory storage Delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -224,11 +223,21 @@ func (s Plugin) Delete(ctx context.Context, keys ...string) error {
}
// Close clears the in-memory storage
-func (s Plugin) Close() error {
+func (s *Plugin) Close() error {
s.stop <- struct{}{}
return nil
}
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
// ================================== PRIVATE ======================================
func (s *Plugin) gc() {
diff --git a/plugins/kv/memory/storage_test.go b/plugins/kv/memory/storage_test.go
index b7b46637..4b30460d 100644
--- a/plugins/kv/memory/storage_test.go
+++ b/plugins/kv/memory/storage_test.go
@@ -1,7 +1,6 @@
package memory
import (
- "context"
"strconv"
"sync"
"testing"
@@ -16,7 +15,7 @@ func initStorage() kv.Storage {
}
func cleanup(t *testing.T, s kv.Storage, keys ...string) {
- err := s.Delete(context.Background(), keys...)
+ err := s.Delete(keys...)
if err != nil {
t.Fatalf("error during cleanup: %s", err.Error())
}
@@ -25,9 +24,7 @@ func cleanup(t *testing.T, s kv.Storage, keys ...string) {
func TestStorage_Has(t *testing.T) {
s := initStorage()
- ctx := context.Background()
-
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
}
@@ -41,13 +38,12 @@ func TestStorage_Has_Set_Has(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: "",
@@ -58,7 +54,7 @@ func TestStorage_Has_Set_Has(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -74,13 +70,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: "",
@@ -91,13 +86,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
@@ -111,13 +106,12 @@ func TestStorage_Has_Set_Get(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: "",
@@ -128,13 +122,13 @@ func TestStorage_Has_Set_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.Get(ctx, "key")
+ res, err := s.Get("key")
assert.NoError(t, err)
if string(res) != "value" {
@@ -151,13 +145,12 @@ func TestStorage_Set_Del_Get(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: "",
@@ -168,27 +161,26 @@ func TestStorage_Set_Del_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
// check that keys are present
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
- assert.NoError(t, s.Delete(ctx, "key", "key2"))
- // check that keys are not presentps -eo state,uid,pid,ppid,rtprio,time,comm
- res, err = s.MGet(ctx, "key", "key2")
+ assert.NoError(t, s.Delete("key", "key2"))
+ // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm
+ res, err = s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 0)
}
func TestStorage_Set_GetM(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
@@ -198,11 +190,11 @@ func TestStorage_Set_GetM(t *testing.T) {
}
}()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: "",
@@ -213,14 +205,13 @@ func TestStorage_Set_GetM(t *testing.T) {
TTL: "",
}))
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
func TestStorage_MExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
@@ -230,12 +221,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -258,12 +249,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
Value: "",
TTL: nowPlusFive,
}
- assert.NoError(t, s.MExpire(ctx, i1, i2))
+ assert.NoError(t, s.MExpire(i1, i2))
time.Sleep(time.Second * 6)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
@@ -271,7 +262,6 @@ func TestStorage_MExpire_TTL(t *testing.T) {
func TestNilAndWrongArgs(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
if err := s.Close(); err != nil {
panic(err)
@@ -279,48 +269,47 @@ func TestNilAndWrongArgs(t *testing.T) {
}()
// check
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- _, err = s.Has(ctx, "")
+ _, err = s.Has("")
assert.Error(t, err)
- _, err = s.Get(ctx, "")
+ _, err = s.Get("")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", "")
+ _, err = s.MGet("key", "key2", "")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", " ")
+ _, err = s.MGet("key", "key2", " ")
assert.Error(t, err)
- assert.NoError(t, s.Set(ctx, kv.Item{}))
- _, err = s.Has(ctx, "key")
+ assert.NoError(t, s.Set(kv.Item{}))
+ _, err = s.Has("key")
assert.NoError(t, err)
- err = s.Delete(ctx, "")
+ err = s.Delete("")
assert.Error(t, err)
- err = s.Delete(ctx, "key", "")
+ err = s.Delete("key", "")
assert.Error(t, err)
- err = s.Delete(ctx, "key", " ")
+ err = s.Delete("key", " ")
assert.Error(t, err)
- err = s.Delete(ctx, "key")
+ err = s.Delete("key")
assert.NoError(t, err)
}
func TestStorage_SetExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
if err := s.Close(); err != nil {
@@ -329,12 +318,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -348,7 +337,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
// set timeout to 5 sec
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: nowPlusFive,
@@ -360,7 +349,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
}))
time.Sleep(time.Second * 2)
- m, err := s.TTL(ctx, "key", "key2")
+ m, err := s.TTL("key", "key2")
assert.NoError(t, err)
// remove a precision 4.02342342 -> 4
@@ -381,7 +370,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
time.Sleep(time.Second * 4)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
@@ -396,13 +385,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -412,7 +400,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -429,7 +417,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
m.Lock()
// set is writable transaction
// it should stop readable
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key" + strconv.Itoa(i),
Value: "hello world" + strconv.Itoa(i),
TTL: "",
@@ -447,7 +435,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.RLock()
- v, err = s.Has(ctx, "key")
+ v, err = s.Has("key")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -460,7 +448,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.Lock()
- err = s.Delete(ctx, "key"+strconv.Itoa(i))
+ err = s.Delete("key" + strconv.Itoa(i))
assert.NoError(t, err)
m.Unlock()
}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
new file mode 100644
index 00000000..751f0d12
--- /dev/null
+++ b/plugins/kv/rpc.go
@@ -0,0 +1,110 @@
+package kv
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// Wrapper for the plugin
+type RPCServer struct {
+ // svc is a plugin implementing Storage interface
+ svc Storage
+ // Logger
+ log logger.Logger
+}
+
+// NewRPCServer construct RPC server for the particular plugin
+func NewRPCServer(srv Storage, log logger.Logger) *RPCServer {
+ return &RPCServer{
+ svc: srv,
+ log: log,
+ }
+}
+
+// data Data
+func (r *RPCServer) Has(in []string, res *map[string]bool) error {
+ const op = errors.Op("rpc server Has")
+ ret, err := r.svc.Has(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // update the value in the pointer
+ *res = ret
+ return nil
+}
+
+// in SetData
+func (r *RPCServer) Set(in []Item, ok *bool) error {
+ const op = errors.Op("rpc server Set")
+
+ err := r.svc.Set(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ *ok = true
+ return nil
+}
+
+// in Data
+func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error {
+ const op = errors.Op("rpc server MGet")
+ ret, err := r.svc.MGet(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // update return value
+ *res = ret
+ return nil
+}
+
+// in Data
+func (r *RPCServer) MExpire(in []Item, ok *bool) error {
+ const op = errors.Op("rpc server MExpire")
+
+ err := r.svc.MExpire(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ *ok = true
+ return nil
+}
+
+// in Data
+func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error {
+ const op = errors.Op("rpc server TTL")
+
+ ret, err := r.svc.TTL(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ *res = ret
+ return nil
+}
+
+// in Data
+func (r *RPCServer) Delete(in []string, ok *bool) error {
+ const op = errors.Op("rpc server Delete")
+ err := r.svc.Delete(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+// in string, storages
+func (r *RPCServer) Close(storage string, ok *bool) error {
+ const op = errors.Op("rpc server Close")
+ err := r.svc.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ *ok = true
+
+ return nil
+}
diff --git a/rr b/rr
new file mode 100755
index 00000000..c0ff4c40
--- /dev/null
+++ b/rr
Binary files differ
diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml
new file mode 100644
index 00000000..fd1a48bf
--- /dev/null
+++ b/tests/docker-compose.yaml
@@ -0,0 +1,7 @@
+version: '3'
+
+services:
+ memcached:
+ image: memcached:latest
+ ports:
+ - "0.0.0.0:11211:11211" \ No newline at end of file
diff --git a/tests/plugins/kv/boltdb/configs/.rr-init.yaml b/tests/plugins/kv/boltdb/configs/.rr-init.yaml
new file mode 100644
index 00000000..4629a24b
--- /dev/null
+++ b/tests/plugins/kv/boltdb/configs/.rr-init.yaml
@@ -0,0 +1,46 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+ disabled: false
+
+server:
+ command: "php ../../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ env:
+ "RR_HTTP": "true"
+ relay: "pipes"
+ relayTimeout: "20s"
+
+logs:
+ mode: development
+ level: debug
+
+http:
+ address: 127.0.0.1:44933
+ maxRequestSize: 1024
+ middleware: ["gzip", "headers"]
+ uploads:
+ forbid: [".php", ".exe", ".bat"]
+ trustedSubnets:
+ [
+ "10.0.0.0/8",
+ "127.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "::1/128",
+ "fc00::/7",
+ "fe80::/10",
+ ]
+ pool:
+ numWorkers: 6
+ maxJobs: 0
+ allocateTimeout: 60s
+ destroyTimeout: 60s
+
+# boltdb simple driver
+boltdb:
+ dir: "."
+ file: "rr"
+ bucket: "test"
+ permissions: 777
+ interval: 1 # seconds
diff --git a/tests/plugins/kv/boltdb/plugin_test.go b/tests/plugins/kv/boltdb/plugin_test.go
new file mode 100644
index 00000000..ba9b695a
--- /dev/null
+++ b/tests/plugins/kv/boltdb/plugin_test.go
@@ -0,0 +1,195 @@
+package boltdb_tests //nolint:golint,stylecheck
+
+import (
+ "net"
+ "net/rpc"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/kv/boltdb"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestBoltDb(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &boltdb.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &httpPlugin.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("testBoltDbRPCMethods", testRPCMethods)
+ stopCh <- struct{}{}
+ wg.Wait()
+
+ _ = os.Remove("rr")
+}
+
+func testRPCMethods(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ var setRes bool
+ items := make([]kv.Item, 0, 5)
+ items = append(items, kv.Item{
+ Key: "a",
+ Value: "aa",
+ })
+ items = append(items, kv.Item{
+ Key: "b",
+ Value: "bb",
+ })
+ // add 5 second ttl
+ tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+ items = append(items, kv.Item{
+ Key: "c",
+ Value: "cc",
+ TTL: tt,
+ })
+
+ items = append(items, kv.Item{
+ Key: "d",
+ Value: "dd",
+ })
+
+ items = append(items, kv.Item{
+ Key: "e",
+ Value: "ee",
+ })
+
+ // Register 3 keys with values
+ err = client.Call("boltdb.Set", items, &setRes)
+ assert.NoError(t, err)
+ assert.True(t, setRes)
+
+ ret := make(map[string]bool)
+ keys := []string{"a", "b", "c"}
+ err = client.Call("boltdb.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 3) // should be 3
+
+ // key "c" should be deleted
+ time.Sleep(time.Second * 7)
+
+ ret = make(map[string]bool)
+ err = client.Call("boltdb.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 2) // should be 2
+
+ mGet := make(map[string]interface{})
+ keys = []string{"a", "b", "c"}
+ err = client.Call("boltdb.MGet", keys, &mGet)
+ assert.NoError(t, err)
+ assert.Len(t, mGet, 2) // c is expired
+ assert.Equal(t, string("aa"), mGet["a"].(string))
+ assert.Equal(t, string("bb"), mGet["b"].(string))
+
+ mExpKeys := make([]kv.Item, 0, 2)
+ tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
+ mExpKeys = append(mExpKeys, kv.Item{Key: "a", TTL: tt2})
+ mExpKeys = append(mExpKeys, kv.Item{Key: "b", TTL: tt2})
+ mExpKeys = append(mExpKeys, kv.Item{Key: "d", TTL: tt2})
+
+ // MEXPIRE
+ var mExpRes bool
+ err = client.Call("boltdb.MExpire", mExpKeys, &mExpRes)
+ assert.NoError(t, err)
+ assert.True(t, mExpRes)
+
+ // TTL
+ keys = []string{"a", "b", "d"}
+ ttlRes := make(map[string]interface{})
+ err = client.Call("boltdb.TTL", keys, &ttlRes)
+ assert.NoError(t, err)
+ assert.Len(t, ttlRes, 3)
+
+ // HAS AFTER TTL
+ time.Sleep(time.Second * 11)
+ ret = make(map[string]bool)
+ keys = []string{"a", "b", "d"}
+ err = client.Call("boltdb.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 0)
+
+ // DELETE
+ keys = []string{"e"}
+ var delRet bool
+ err = client.Call("boltdb.Delete", keys, &delRet)
+ assert.NoError(t, err)
+ assert.True(t, delRet)
+
+ // HAS AFTER DELETE
+ ret = make(map[string]bool)
+ keys = []string{"e"}
+ err = client.Call("boltdb.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 0)
+}
diff --git a/tests/plugins/kv/memcached/configs/.rr-init.yaml b/tests/plugins/kv/memcached/configs/.rr-init.yaml
new file mode 100644
index 00000000..759fc3ba
--- /dev/null
+++ b/tests/plugins/kv/memcached/configs/.rr-init.yaml
@@ -0,0 +1,43 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+ disabled: false
+
+server:
+ command: "php ../../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ env:
+ "RR_HTTP": "true"
+ relay: "pipes"
+ relayTimeout: "20s"
+
+logs:
+ mode: development
+ level: debug
+
+http:
+ address: 127.0.0.1:44933
+ maxRequestSize: 1024
+ middleware: ["gzip", "headers"]
+ uploads:
+ forbid: [".php", ".exe", ".bat"]
+ trustedSubnets:
+ [
+ "10.0.0.0/8",
+ "127.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "::1/128",
+ "fc00::/7",
+ "fe80::/10",
+ ]
+ pool:
+ numWorkers: 6
+ maxJobs: 0
+ allocateTimeout: 60s
+ destroyTimeout: 60s
+
+# boltdb simple driver
+memcached:
+ addr:
+ - "localhost:11211" \ No newline at end of file
diff --git a/tests/plugins/kv/memcached/plugin_test.go b/tests/plugins/kv/memcached/plugin_test.go
new file mode 100644
index 00000000..d30de3b6
--- /dev/null
+++ b/tests/plugins/kv/memcached/plugin_test.go
@@ -0,0 +1,195 @@
+package memcached_test
+
+import (
+ "net"
+ "net/rpc"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/kv/memcached"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestMemcache(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &memcached.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &httpPlugin.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("testMemcachedRPCMethods", testRPCMethods)
+ stopCh <- struct{}{}
+ wg.Wait()
+
+ _ = os.Remove("rr")
+}
+
+func testRPCMethods(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ var setRes bool
+ items := make([]kv.Item, 0, 5)
+ items = append(items, kv.Item{
+ Key: "a",
+ Value: "aa",
+ })
+ items = append(items, kv.Item{
+ Key: "b",
+ Value: "bb",
+ })
+ // add 5 second ttl
+ tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+ items = append(items, kv.Item{
+ Key: "c",
+ Value: "cc",
+ TTL: tt,
+ })
+
+ items = append(items, kv.Item{
+ Key: "d",
+ Value: "dd",
+ })
+
+ items = append(items, kv.Item{
+ Key: "e",
+ Value: "ee",
+ })
+
+ // Register 3 keys with values
+ err = client.Call("memcached.Set", items, &setRes)
+ assert.NoError(t, err)
+ assert.True(t, setRes)
+
+ ret := make(map[string]bool)
+ keys := []string{"a", "b", "c"}
+ err = client.Call("memcached.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 3) // should be 3
+
+ // key "c" should be deleted
+ time.Sleep(time.Second * 7)
+
+ ret = make(map[string]bool)
+ err = client.Call("memcached.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 2) // should be 2
+
+ mGet := make(map[string]interface{})
+ keys = []string{"a", "b", "c"}
+ err = client.Call("memcached.MGet", keys, &mGet)
+ assert.NoError(t, err)
+ assert.Len(t, mGet, 2) // c is expired
+ assert.Equal(t, string("aa"), mGet["a"].(string))
+ assert.Equal(t, string("bb"), mGet["b"].(string))
+
+ mExpKeys := make([]kv.Item, 0, 2)
+ tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
+ mExpKeys = append(mExpKeys, kv.Item{Key: "a", TTL: tt2})
+ mExpKeys = append(mExpKeys, kv.Item{Key: "b", TTL: tt2})
+ mExpKeys = append(mExpKeys, kv.Item{Key: "d", TTL: tt2})
+
+ // MEXPIRE
+ var mExpRes bool
+ err = client.Call("memcached.MExpire", mExpKeys, &mExpRes)
+ assert.NoError(t, err)
+ assert.True(t, mExpRes)
+
+ // TTL
+ keys = []string{"a", "b", "d"}
+ ttlRes := make(map[string]interface{})
+ err = client.Call("memcached.TTL", keys, &ttlRes)
+ assert.NoError(t, err)
+ assert.Len(t, ttlRes, 3)
+
+ // HAS AFTER TTL
+ time.Sleep(time.Second * 11)
+ ret = make(map[string]bool)
+ keys = []string{"a", "b", "d"}
+ err = client.Call("memcached.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 0)
+
+ // DELETE
+ keys = []string{"e"}
+ var delRet bool
+ err = client.Call("memcached.Delete", keys, &delRet)
+ assert.NoError(t, err)
+ assert.True(t, delRet)
+
+ // HAS AFTER DELETE
+ ret = make(map[string]bool)
+ keys = []string{"e"}
+ err = client.Call("memcached.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 0)
+}
diff --git a/tests/plugins/kv/memory/configs/.rr-init.yaml b/tests/plugins/kv/memory/configs/.rr-init.yaml
new file mode 100644
index 00000000..3dceea95
--- /dev/null
+++ b/tests/plugins/kv/memory/configs/.rr-init.yaml
@@ -0,0 +1,42 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+ disabled: false
+
+server:
+ command: "php ../../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ env:
+ "RR_HTTP": "true"
+ relay: "pipes"
+ relayTimeout: "20s"
+
+logs:
+ mode: development
+ level: debug
+
+http:
+ address: 127.0.0.1:44933
+ maxRequestSize: 1024
+ middleware: ["gzip", "headers"]
+ uploads:
+ forbid: [".php", ".exe", ".bat"]
+ trustedSubnets:
+ [
+ "10.0.0.0/8",
+ "127.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "::1/128",
+ "fc00::/7",
+ "fe80::/10",
+ ]
+ pool:
+ numWorkers: 6
+ maxJobs: 0
+ allocateTimeout: 60s
+ destroyTimeout: 60s
+
+# in memory KV driver
+memory:
+ enabled: true \ No newline at end of file
diff --git a/tests/plugins/kv/memory/plugin_test.go b/tests/plugins/kv/memory/plugin_test.go
new file mode 100644
index 00000000..c6f94602
--- /dev/null
+++ b/tests/plugins/kv/memory/plugin_test.go
@@ -0,0 +1,195 @@
+package memory_test
+
+import (
+ "net"
+ "net/rpc"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/kv/memory"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestInMemory(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &memory.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &httpPlugin.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("testInMemoryRPCMethods", testRPCMethods)
+ stopCh <- struct{}{}
+ wg.Wait()
+
+ _ = os.Remove("rr")
+}
+
+func testRPCMethods(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ var setRes bool
+ items := make([]kv.Item, 0, 5)
+ items = append(items, kv.Item{
+ Key: "a",
+ Value: "aa",
+ })
+ items = append(items, kv.Item{
+ Key: "b",
+ Value: "bb",
+ })
+ // add 5 second ttl
+ tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+ items = append(items, kv.Item{
+ Key: "c",
+ Value: "cc",
+ TTL: tt,
+ })
+
+ items = append(items, kv.Item{
+ Key: "d",
+ Value: "dd",
+ })
+
+ items = append(items, kv.Item{
+ Key: "e",
+ Value: "ee",
+ })
+
+ // Register 3 keys with values
+ err = client.Call("memory.Set", items, &setRes)
+ assert.NoError(t, err)
+ assert.True(t, setRes)
+
+ ret := make(map[string]bool)
+ keys := []string{"a", "b", "c"}
+ err = client.Call("memory.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 3) // should be 3
+
+ // key "c" should be deleted
+ time.Sleep(time.Second * 7)
+
+ ret = make(map[string]bool)
+ err = client.Call("memory.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 2) // should be 2
+
+ mGet := make(map[string]interface{})
+ keys = []string{"a", "b", "c"}
+ err = client.Call("memory.MGet", keys, &mGet)
+ assert.NoError(t, err)
+ assert.Len(t, mGet, 2) // c is expired
+ assert.Equal(t, string("aa"), mGet["a"].(string))
+ assert.Equal(t, string("bb"), mGet["b"].(string))
+
+ mExpKeys := make([]kv.Item, 0, 2)
+ tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
+ mExpKeys = append(mExpKeys, kv.Item{Key: "a", TTL: tt2})
+ mExpKeys = append(mExpKeys, kv.Item{Key: "b", TTL: tt2})
+ mExpKeys = append(mExpKeys, kv.Item{Key: "d", TTL: tt2})
+
+ // MEXPIRE
+ var mExpRes bool
+ err = client.Call("memory.MExpire", mExpKeys, &mExpRes)
+ assert.NoError(t, err)
+ assert.True(t, mExpRes)
+
+ // TTL
+ keys = []string{"a", "b", "d"}
+ ttlRes := make(map[string]interface{})
+ err = client.Call("memory.TTL", keys, &ttlRes)
+ assert.NoError(t, err)
+ assert.Len(t, ttlRes, 3)
+
+ // HAS AFTER TTL
+ time.Sleep(time.Second * 11)
+ ret = make(map[string]bool)
+ keys = []string{"a", "b", "d"}
+ err = client.Call("memory.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 0)
+
+ // DELETE
+ keys = []string{"e"}
+ var delRet bool
+ err = client.Call("memory.Delete", keys, &delRet)
+ assert.NoError(t, err)
+ assert.True(t, delRet)
+
+ // HAS AFTER DELETE
+ ret = make(map[string]bool)
+ keys = []string{"e"}
+ err = client.Call("memory.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 0)
+}