summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/release.yml2
-rwxr-xr-x.rr.yaml70
-rw-r--r--.vscode/launch.json25
-rw-r--r--.vscode/settings.json14
-rwxr-xr-xMakefile4
-rw-r--r--cmd/cli/root.go1
-rw-r--r--dput.cf2
-rwxr-xr-xgo.mod10
-rwxr-xr-xgo.sum64
-rwxr-xr-xpkg/pipe/pipe_factory_test.go3
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/static_pool_test.go2
-rwxr-xr-xpkg/worker/sync_worker.go6
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go4
-rw-r--r--plugins/kv/boltdb/config.go23
-rw-r--r--plugins/kv/boltdb/plugin.go476
-rw-r--r--plugins/kv/boltdb/plugin_unit_test.go485
-rw-r--r--plugins/kv/interface.go45
-rw-r--r--plugins/kv/memcached/config.go10
-rw-r--r--plugins/kv/memcached/plugin.go243
-rw-r--r--plugins/kv/memcached/storage_test.go444
-rw-r--r--plugins/kv/memory/config.go12
-rw-r--r--plugins/kv/memory/storage.go263
-rw-r--r--plugins/kv/memory/storage_test.go470
-rw-r--r--plugins/reload/plugin.go2
-rw-r--r--plugins/reload/watcher.go4
-rw-r--r--tests/plugins/http/plugin_middleware.go8
27 files changed, 2654 insertions, 40 deletions
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index fbf9af0c..785f40ad 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -28,7 +28,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
- go-version: 1.15.5
+ go-version: 1.15.6
- name: Check out code
uses: actions/checkout@v2
diff --git a/.rr.yaml b/.rr.yaml
index 5f6d3cb2..ae75bb27 100755
--- a/.rr.yaml
+++ b/.rr.yaml
@@ -18,10 +18,19 @@ logs:
http:
address: 127.0.0.1:44933
maxRequestSize: 1024
- middleware: [ "gzip", "headers" ]
+ middleware: ["gzip", "headers"]
uploads:
- forbid: [ ".php", ".exe", ".bat" ]
- trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ forbid: [".php", ".exe", ".bat"]
+ trustedSubnets:
+ [
+ "10.0.0.0/8",
+ "127.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "::1/128",
+ "fc00::/7",
+ "fe80::/10",
+ ]
pool:
numWorkers: 6
maxJobs: 0
@@ -50,29 +59,46 @@ redis:
# if the number of Addrs is two or more, a ClusterClient will be returned
addrs:
- - 'localhost:6379'
+ - "localhost:6379"
# if a MasterName is passed a sentinel-backed FailoverClient will be returned
- master_name: ''
- username: ''
- password: ''
+ master_name: ""
+ username: ""
+ password: ""
db: 0
- sentinel_password: ''
+ sentinel_password: ""
route_by_latency: false
route_randomly: false
dial_timeout: 0 # accepted values [1s, 5m, 3h]
max_retries: 1
- min_retry_backoff: 0 # accepted values [1s, 5m, 3h]
- max_retry_backoff: 0 # accepted values [1s, 5m, 3h]
+ min_retry_backoff: 0 # accepted values [1s, 5m, 3h]
+ max_retry_backoff: 0 # accepted values [1s, 5m, 3h]
pool_size: 0
min_idle_conns: 0
- max_conn_age: 0 # accepted values [1s, 5m, 3h]
- read_timeout: 0 # accepted values [1s, 5m, 3h]
- write_timeout: 0 # accepted values [1s, 5m, 3h]
- pool_timeout: 0 # accepted values [1s, 5m, 3h]
- idle_timeout: 0 # accepted values [1s, 5m, 3h]
- idle_check_freq: 0 # accepted values [1s, 5m, 3h]
+ max_conn_age: 0 # accepted values [1s, 5m, 3h]
+ read_timeout: 0 # accepted values [1s, 5m, 3h]
+ write_timeout: 0 # accepted values [1s, 5m, 3h]
+ pool_timeout: 0 # accepted values [1s, 5m, 3h]
+ idle_timeout: 0 # accepted values [1s, 5m, 3h]
+ idle_check_freq: 0 # accepted values [1s, 5m, 3h]
read_only: false
+# boltdb simple driver
+boltdb:
+ dir: "."
+ file: "rr"
+ bucket: "test"
+ permissions: 0777
+ TTL: 60 # seconds
+
+ # memcached driver
+memcached:
+ addr:
+ - "localhost:11211"
+
+# in memory KV driver
+memory:
+ enabled: true
+
metrics:
# prometheus client address (path /metrics added automatically)
address: localhost:2112
@@ -80,8 +106,8 @@ metrics:
app_metric:
type: histogram
help: "Custom application metric"
- labels: [ "type" ]
- buckets: [ 0.1, 0.2, 0.3, 1.0 ]
+ labels: ["type"]
+ buckets: [0.1, 0.2, 0.3, 1.0]
# objectives defines the quantile rank estimates with their respective
# absolute error [ for summary only ]
objectives:
@@ -92,16 +118,16 @@ reload:
# sync interval
interval: 1s
# global patterns to sync
- patterns: [ ".go" ]
+ patterns: [".go"]
# list of included for sync services
services:
http:
# recursive search for file patterns to add
recursive: true
# ignored folders
- ignore: [ "vendor" ]
+ ignore: ["vendor"]
# service specific file pattens to sync
- patterns: [ ".php", ".go",".md", ]
+ patterns: [".php", ".go", ".md"]
# directories to sync. If recursive is set to true,
# recursive sync will be applied only to the directories in `dirs` section
- dirs: [ "." ] \ No newline at end of file
+ dirs: ["."]
diff --git a/.vscode/launch.json b/.vscode/launch.json
new file mode 100644
index 00000000..3cf21fed
--- /dev/null
+++ b/.vscode/launch.json
@@ -0,0 +1,25 @@
+{
+ // Use IntelliSense to learn about possible attributes.
+ // Hover to view descriptions of existing attributes.
+ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
+ "version": "0.2.0",
+ "configurations": [
+ {
+ "name": "Launch main debug, race",
+ "type": "go",
+ "request": "launch",
+ "mode": "auto",
+ "showLog": true,
+ "buildFlags": "-race",
+ "args": ["serve", "-c", "../.rr.yaml"],
+ "program": "${workspaceFolder}/cmd/main.go"
+ },
+ {
+ "name": "Launch file",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "${file}"
+ },
+ ]
+} \ No newline at end of file
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 00000000..70a50b98
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,14 @@
+{
+ "workbench.editor.enablePreview": false,
+ "go.testFlags": ["-v", "-tags=debug", "-race"],
+ "go.lintTool": "golangci-lint",
+ "go.lintFlags": [
+ "--fast"
+ ],
+ "cSpell.words": [
+ "asdf",
+ "bbolt",
+ "stopc",
+ "treshholdc"
+ ]
+} \ No newline at end of file
diff --git a/Makefile b/Makefile
index 87999417..87f9f1a7 100755
--- a/Makefile
+++ b/Makefile
@@ -44,6 +44,10 @@ test: ## Run application tests
go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/resetter
go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/rpc
go test -v -race -cover -tags=debug -covermode=atomic ./tests/plugins/static
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/boltdb
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/memory
lint: ## Run application linters
golangci-lint run
+kv:
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/kv/boltdb
diff --git a/cmd/cli/root.go b/cmd/cli/root.go
index 06a84a82..10e389f2 100644
--- a/cmd/cli/root.go
+++ b/cmd/cli/root.go
@@ -46,6 +46,7 @@ func init() {
cobra.OnInitialize(func() {
if CfgFile != "" {
if absPath, err := filepath.Abs(CfgFile); err == nil {
+ println(absPath)
CfgFile = absPath
// force working absPath related to config file
diff --git a/dput.cf b/dput.cf
index 818b9507..d784a825 100644
--- a/dput.cf
+++ b/dput.cf
@@ -1,5 +1,5 @@
[roadrunner]
fqdn = ppa.launchpad.net
method = ftp
-incoming = 48d90782/ubuntu/roadrunner
+incoming = 48d90782/ubuntu/roadrunner
login = anonymous
diff --git a/go.mod b/go.mod
index 9eeafd4b..92c9953f 100755
--- a/go.mod
+++ b/go.mod
@@ -6,8 +6,9 @@ require (
github.com/NYTimes/gziphandler v1.1.1
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/alicebob/miniredis/v2 v2.14.1
+ github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129
- github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4
+ github.com/dustin/go-humanize v1.0.0
github.com/fatih/color v1.10.0
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-redis/redis/v8 v8.4.4
@@ -17,17 +18,18 @@ require (
github.com/json-iterator/go v1.1.10
github.com/mattn/go-runewidth v0.0.9
github.com/olekukonko/tablewriter v0.0.4
- github.com/prometheus/client_golang v0.9.3
+ github.com/prometheus/client_golang v1.7.1
github.com/shirou/gopsutil v3.20.11+incompatible
github.com/spf13/cobra v1.1.1
- github.com/spf13/viper v1.7.0
+ github.com/spf13/viper v1.7.1
github.com/spiral/endure v1.0.0-beta20
- github.com/spiral/errors v1.0.6
+ github.com/spiral/errors v1.0.7
github.com/spiral/goridge/v3 v3.0.0-beta8
github.com/stretchr/testify v1.6.1
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
github.com/vbauerster/mpb/v5 v5.4.0
github.com/yookoala/gofast v0.4.0
+ go.etcd.io/bbolt v1.3.5
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20201216054612-986b41b23924
diff --git a/go.sum b/go.sum
index 2798d923..38deb795 100755
--- a/go.sum
+++ b/go.sum
@@ -24,7 +24,9 @@ github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmx
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.14.1 h1:GjlbSeoJ24bzdLRs13HoMEeaRZx9kg5nHoRW7QV/nCs=
@@ -32,15 +34,22 @@ github.com/alicebob/miniredis/v2 v2.14.1/go.mod h1:uS970Sw5Gs9/iK3yBg0l9Uj9s25wX
github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDafo4=
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
+github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
+github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
+github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
+github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U=
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 h1:gfAMKE626QEuKG3si0pdTRcr/YEbBoxY+3GOH3gWvl4=
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U=
+github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
@@ -52,7 +61,9 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
+github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
+github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
@@ -66,6 +77,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
+github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
@@ -78,6 +91,7 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
@@ -127,6 +141,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
@@ -174,16 +189,19 @@ github.com/klauspost/compress v1.10.7 h1:7rix8v8GpI3ZBb0nSozFRgbtXKv+hOe+qfEpZqy
github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
+github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI=
@@ -195,6 +213,7 @@ github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/Qd
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
@@ -231,32 +250,49 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3 h1:9iH4JKXLzFbOAdtqv/a+j8aewx2Y8lAjAydhbaScPF8=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
+github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
+github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA=
+github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
+github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc=
+github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084 h1:sofwID9zm4tzrgykg80hfFph1mryUeLRsUfoocVVmRY=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
+github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
+github.com/shirou/gopsutil v2.20.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto=
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
+github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/smartystreets/assertions v0.0.0-20180820201707-7c9eb446e3cf/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
@@ -269,6 +305,7 @@ github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
+github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/cobra v1.1.1 h1:KfztREH0tPxJJ+geloSLaAkaPkr4ki2Er5quFV1TDo4=
github.com/spf13/cobra v1.1.1/go.mod h1:WnodtKOvamDL/PwE2M4iKs8aMDBZ5Q5klgD3qfVJQMI=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
@@ -277,8 +314,11 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
+github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spf13/viper v1.7.0 h1:xVKxvI7ouOI5I+U9s2eeiUfMaWBVoXA3AWskkrqK0VM=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
+github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
+github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spiral/endure v1.0.0-beta20 h1:QD3EJ6CRLgeo/6trfnlUcQhH3vrK8Hvf9ceDpde+yss=
github.com/spiral/endure v1.0.0-beta20/go.mod h1:qCU2/4gAItVESzUK0yPExmUTlTcpRLqJUgcV+nqxn+o=
github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
@@ -286,8 +326,14 @@ github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM=
github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.6 h1:berk5ShEILSw6DplUVv9Ea1wGdk2WlVKQpuvDngll0U=
github.com/spiral/errors v1.0.6/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
+github.com/spiral/errors v1.0.7 h1:GRN7Sjk4yVavD2W+1fUWBjqoivWQsnbsXbX7xyhZhbU=
+github.com/spiral/errors v1.0.7/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
+github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc=
github.com/spiral/goridge/v3 v3.0.0-beta8 h1:x8uXCdhY49U1LEvmehnTaD2El6J9ZHAefRdh/QIZ6A4=
github.com/spiral/goridge/v3 v3.0.0-beta8/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE=
+github.com/spiral/kv v0.0.0-20200601133055-3397db7fc998 h1:TtGWRjpF7iQy1IA7nuJXVfhnJb9m39pf7YuBXNPcKMc=
+github.com/spiral/roadrunner v1.9.1 h1:905qx8bIQN/XBz+ScOqrPeKdqf0lqm9rXwO//b5N4C4=
+github.com/spiral/roadrunner v1.9.1/go.mod h1:Q1al1YGjs7ZHVkAA7+gUKM0rwk6XWG07G0UjyjjuK+0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -298,6 +344,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
+github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.18.0 h1:IV0DdMlatq9QO1Cr6wGJPVW1sV1Q8HvZXAIcjorylyM=
@@ -309,11 +356,15 @@ github.com/vbauerster/mpb/v5 v5.4.0/go.mod h1:fi4wVo7BVQ22QcvFObm+VwliQXlV1eBT8J
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
+github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yookoala/gofast v0.4.0 h1:dLBjghcsbbZNOEHN8N1X/gh9S6srmJed4WQfG7DlKwo=
github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
+go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
+go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
+go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opentelemetry.io/otel v0.15.0 h1:CZFy2lPhxd4HlhZnYK8gRyDotksO3Ip9rBweY1vVYJw=
@@ -336,6 +387,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -369,9 +421,12 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
+golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
@@ -386,6 +441,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -399,6 +455,7 @@ golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -407,11 +464,14 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201210223839-7e3030f88018/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -473,6 +533,7 @@ google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBr
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
+google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
@@ -485,6 +546,8 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/ini.v1 v1.38.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
@@ -498,6 +561,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index dca09375..a2731294 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -46,7 +46,6 @@ func Test_Kill(t *testing.T) {
go func() {
defer wg.Done()
assert.Error(t, w.Wait())
- // TODO changed from stopped, discuss
assert.Equal(t, internal.StateErrored, w.State().Value())
}()
@@ -465,7 +464,7 @@ func Test_Error(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- if errors.Is(errors.ErrSoftJob, err) == false {
+ if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.ErrSoftJob")
}
assert.Contains(t, err.Error(), "hello")
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 808e7d35..bb53e121 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -234,7 +234,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.BaseProcess) (payload.Payload, error) {
const op = errors.Op("error encoder")
// soft job errors are allowed
- if errors.Is(errors.ErrSoftJob, err) {
+ if errors.Is(errors.SoftJob, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index acdd6ab7..53d6b191 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -158,7 +158,7 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- if errors.Is(errors.ErrSoftJob, err) == false {
+ if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.Exec")
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 13212cc6..6a945cf4 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -43,7 +43,7 @@ func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
- if errors.Is(errors.ErrSoftJob, err) == false {
+ if errors.Is(errors.SoftJob, err) == false {
tw.w.State().Set(internal.StateErrored)
tw.w.State().RegisterExec()
}
@@ -90,7 +90,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
- if errors.Is(errors.ErrSoftJob, err) == false {
+ if errors.Is(errors.SoftJob, err) == false {
tw.w.State().Set(internal.StateErrored)
tw.w.State().RegisterExec()
}
@@ -168,7 +168,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
flags := frameR.ReadFlags()
if flags&byte(frame.ERROR) != byte(0) {
- return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
+ return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
}
options := frameR.ReadOptions()
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 5c0882b0..348f0459 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -178,7 +178,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess,
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
- return nil, errors.E(op, errors.ErrWatcherStopped)
+ return nil, errors.E(op, errors.WatcherStopped)
}
// handle worker remove state
@@ -198,7 +198,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess,
default:
w, stop = ww.stack.Pop()
if stop {
- return nil, errors.E(op, errors.ErrWatcherStopped)
+ return nil, errors.E(op, errors.WatcherStopped)
}
if w == nil {
continue
diff --git a/plugins/kv/boltdb/config.go b/plugins/kv/boltdb/config.go
new file mode 100644
index 00000000..6b116611
--- /dev/null
+++ b/plugins/kv/boltdb/config.go
@@ -0,0 +1,23 @@
+package boltdb
+
+type Config struct {
+ // Dir is a directory to store the DB files
+ Dir string
+ // File is boltDB file. No need to create it by your own,
+ // boltdb driver is able to create the file, or read existing
+ File string
+ // Bucket to store data in boltDB
+ Bucket string
+
+ Permissions int
+ TTL int
+}
+
+func (s *Config) InitDefaults() error {
+ s.Dir = "." // current dir
+ s.Bucket = "rr" // default bucket name
+ s.File = "rr.db" // default file name
+ s.Permissions = 0777 // free for all
+ s.TTL = 60 // 60 seconds is default TTL
+ return nil
+}
diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/boltdb/plugin.go
new file mode 100644
index 00000000..e5eda0c2
--- /dev/null
+++ b/plugins/kv/boltdb/plugin.go
@@ -0,0 +1,476 @@
+package boltdb
+
+import (
+ "bytes"
+ "context"
+ "encoding/gob"
+ "os"
+ "path"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ bolt "go.etcd.io/bbolt"
+)
+
+const PluginName = "boltdb"
+
+// BoltDB K/V storage.
+type Plugin struct {
+ // db instance
+ DB *bolt.DB
+ // name should be UTF-8
+ bucket []byte
+
+ // config for RR integration
+ cfg *Config
+
+ // logger
+ log logger.Logger
+
+ // gc contains key which are contain timeouts
+ gc *sync.Map
+ // default timeout for cache cleanup is 1 minute
+ timeout time.Duration
+
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+}
+
+// NewBoltClient instantiate new BOLTDB client
+// The parameters are:
+// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created
+// perm os.FileMode -- file permissions, for example 0777
+// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other
+// bucket string -- name of the bucket to use, should be UTF-8
+func NewBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) {
+ const op = errors.Op("newBoltClient")
+ db, err := bolt.Open(path, perm, options)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // bucket should be SET
+ if bucket == "" {
+ return nil, errors.E(op, errors.Str("bucket should be set"))
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ _, err = tx.CreateBucketIfNotExists([]byte(bucket))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // if TTL is not set, make it default
+ if ttl == 0 {
+ ttl = time.Minute
+ }
+
+ s := &Plugin{
+ DB: db,
+ bucket: []byte(bucket),
+ stop: make(chan struct{}),
+ timeout: ttl,
+ gc: &sync.Map{},
+ }
+
+ // start the TTL gc
+ go s.gcPhase()
+
+ return s, nil
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ const op = errors.Op("boltdb plugin init")
+ s.cfg = &Config{}
+
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ s.log = log
+
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ const op = errors.Op("boltdb serve")
+ errCh := make(chan error, 1)
+
+ db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb Update")
+ _, err = tx.CreateBucketIfNotExists([]byte(s.cfg.Bucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ return nil
+ })
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ s.DB = db
+ s.bucket = []byte(s.cfg.Bucket)
+ s.stop = make(chan struct{})
+ s.timeout = time.Duration(s.cfg.TTL) * time.Second
+ s.gc = &sync.Map{}
+
+ // start the TTL gc
+ go s.gcPhase()
+
+ return errCh
+}
+
+func (s Plugin) Stop() error {
+ const op = errors.Op("boltdb stop")
+ err := s.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+ const op = errors.Op("boltdb Has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ m := make(map[string]bool, len(keys))
+
+ // this is readable transaction
+ err := s.DB.View(func(tx *bolt.Tx) error {
+ // Get retrieves the value for a key in the bucket.
+ // Returns a nil value if the key does not exist or if the key is a nested bucket.
+ // The returned value is only valid for the life of the transaction.
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ b := tx.Bucket(s.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ exist := b.Get([]byte(key))
+ if exist != nil {
+ m[key] = true
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return m, nil
+}
+
+// Get retrieves the value for a key in the bucket.
+// Returns a nil value if the key does not exist or if the key is a nested bucket.
+// The returned value is only valid for the life of the transaction.
+func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+ const op = errors.Op("boltdb Get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ var val []byte
+ err := s.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(s.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ val = b.Get([]byte(key))
+
+ // try to decode values
+ if val != nil {
+ buf := bytes.NewReader(val)
+ decoder := gob.NewDecoder(buf)
+
+ i := kv.Item{}
+ err := decoder.Decode(&i)
+ if err != nil {
+ // unsafe (w/o runes) convert
+ return errors.E(op, err)
+ }
+
+ // set the value
+ val = []byte(i.Value)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return val, nil
+}
+
+func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("boltdb MGet")
+ // defence
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ err := s.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(s.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+
+ for _, key := range keys {
+ value := b.Get([]byte(key))
+ if value != nil {
+ m[key] = value
+ }
+ }
+
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return m, nil
+}
+
+// Set puts the K/V to the bolt
+func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+ const op = errors.Op("boltdb Set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // start writable transaction
+ tx, err := s.DB.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ defer func() {
+ err = tx.Commit()
+ if err != nil {
+ errRb := tx.Rollback()
+ if errRb != nil {
+ s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ }
+ }
+ }()
+
+ b := tx.Bucket(s.bucket)
+ // use access by index to avoid copying
+ for i := range items {
+ // performance note: pass a prepared bytes slice with initial cap
+ // we can't move buf and gob out of loop, because we need to clear both from data
+ // but gob will contain (w/o re-init) the past data
+ buf := bytes.Buffer{}
+ encoder := gob.NewEncoder(&buf)
+ if errors.Is(errors.EmptyItem, err) {
+ return errors.E(op, errors.EmptyItem)
+ }
+
+ err = encoder.Encode(&items[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
+ // buf.Bytes will copy the underlying slice. Take a look in case of performance problems
+ err = b.Put([]byte(items[i].Key), buf.Bytes())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
+ // we do not need mutex here, since we use sync.Map
+ if items[i].TTL != "" {
+ // check correctness of provided TTL
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ s.gc.Store(items[i].Key, items[i].TTL)
+ }
+
+ buf.Reset()
+ }
+
+ return nil
+}
+
+// Delete all keys from DB
+func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+ const op = errors.Op("boltdb Delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ // start writable transaction
+ tx, err := s.DB.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ defer func() {
+ err = tx.Commit()
+ if err != nil {
+ errRb := tx.Rollback()
+ if errRb != nil {
+ s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ }
+ }
+ }()
+
+ b := tx.Bucket(s.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+
+ for _, key := range keys {
+ err = b.Delete([]byte(key))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
+
+// MExpire sets the expiration time to the key
+// If key already has the expiration time, it will be overwritten
+func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+ const op = errors.Op("boltdb MExpire")
+ for i := range items {
+ if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // verify provided TTL
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ s.gc.Store(items[i].Key, items[i].TTL)
+ }
+ return nil
+}
+
+func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("boltdb TTL")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for i := range keys {
+ if item, ok := s.gc.Load(keys[i]); ok {
+ // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64
+ m[keys[i]] = item.(string)
+ }
+ }
+ return m, nil
+}
+
+// Close the DB connection
+func (s Plugin) Close() error {
+ // stop the keys GC
+ s.stop <- struct{}{}
+ return s.DB.Close()
+}
+
+// ========================= PRIVATE =================================
+
+func (s Plugin) gcPhase() {
+ t := time.NewTicker(s.timeout)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ // calculate current time before loop started to be fair
+ now := time.Now()
+ s.gc.Range(func(key, value interface{}) bool {
+ const op = errors.Op("gcPhase")
+ k := key.(string)
+ v, err := time.Parse(time.RFC3339, value.(string))
+ if err != nil {
+ return false
+ }
+
+ if now.After(v) {
+ // time expired
+ s.gc.Delete(k)
+ err := s.DB.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket(s.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ err := b.Delete([]byte(k))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ })
+ if err != nil {
+ s.log.Error("error during the gc phase of update", "error", err)
+ // todo this error is ignored, it means, that timer still be active
+ // to prevent this, we need to invoke t.Stop()
+ return false
+ }
+ }
+ return true
+ })
+ case <-s.stop:
+ return
+ }
+ }
+}
diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/boltdb/plugin_unit_test.go
new file mode 100644
index 00000000..a12c830d
--- /dev/null
+++ b/plugins/kv/boltdb/plugin_unit_test.go
@@ -0,0 +1,485 @@
+package boltdb
+
+import (
+ "context"
+ "os"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/stretchr/testify/assert"
+)
+
+func initStorage() kv.Storage {
+ storage, err := NewBoltClient("rr.db", 0777, nil, "rr", time.Second)
+ if err != nil {
+ panic(err)
+ }
+ return storage
+}
+
+func cleanup(t *testing.T, path string) {
+ err := os.RemoveAll(path)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestStorage_Has(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ v, err := s.Has(context.Background(), "key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+}
+
+func TestStorage_Has_Set_Has(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+}
+
+func TestConcurrentReadWriteTransactions(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ wg := &sync.WaitGroup{}
+ wg.Add(3)
+
+ m := &sync.RWMutex{}
+ // concurrently set the keys
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ // set is writable transaction
+ // it should stop readable
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }, kv.Item{
+ Key: "key2" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }))
+ m.Unlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.RLock()
+ v, err = s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ m.RUnlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ err = s.Delete(ctx, "key"+strconv.Itoa(i))
+ assert.NoError(t, err)
+ m.Unlock()
+ }
+ }(s)
+
+ wg.Wait()
+}
+
+func TestStorage_Has_Set_MGet(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_Has_Set_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world2",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.Get(ctx, "key")
+ assert.NoError(t, err)
+
+ if string(res) != "hello world" {
+ t.Fatal("wrong value by key")
+ }
+}
+
+func TestStorage_Set_Del_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ // check that keys are present
+ res, err := s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+
+ assert.NoError(t, s.Delete(ctx, "key", "key2"))
+ // check that keys are not present
+ res, err = s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 0)
+}
+
+func TestStorage_Set_GetM(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+ ctx := context.Background()
+
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ res, err := s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestNilAndWrongArgs(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ // check
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ _, err = s.Has(ctx, "")
+ assert.Error(t, err)
+
+ _, err = s.Get(ctx, "")
+ assert.Error(t, err)
+
+ _, err = s.Get(ctx, " ")
+ assert.Error(t, err)
+
+ _, err = s.Get(ctx, " ")
+ assert.Error(t, err)
+
+ _, err = s.MGet(ctx, "key", "key2", "")
+ assert.Error(t, err)
+
+ _, err = s.MGet(ctx, "key", "key2", " ")
+ assert.Error(t, err)
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ assert.Error(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "asdf",
+ }))
+
+ _, err = s.Has(ctx, "key")
+ assert.NoError(t, err)
+
+ assert.Error(t, s.Set(ctx, kv.Item{}))
+
+ err = s.Delete(ctx, "")
+ assert.Error(t, err)
+
+ err = s.Delete(ctx, "key", "")
+ assert.Error(t, err)
+
+ err = s.Delete(ctx, "key", " ")
+ assert.Error(t, err)
+
+ err = s.Delete(ctx, "key")
+ assert.NoError(t, err)
+}
+
+func TestStorage_MExpire_TTL(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+ // set timeout to 5 sec
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ i1 := kv.Item{
+ Key: "key",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ i2 := kv.Item{
+ Key: "key2",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ assert.NoError(t, s.MExpire(ctx, i1, i2))
+
+ time.Sleep(time.Second * 6)
+
+ // ensure that storage is clean
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestStorage_SetExpire_TTL(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ // set timeout to 5 sec
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: nowPlusFive,
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: nowPlusFive,
+ }))
+
+ time.Sleep(time.Second * 2)
+ m, err := s.TTL(ctx, "key", "key2")
+ assert.NoError(t, err)
+
+ // remove a precision 4.02342342 -> 4
+ keyTTL, err := strconv.Atoi(m["key"].(string)[0:1])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // remove a precision 4.02342342 -> 4
+ key2TTL, err := strconv.Atoi(m["key"].(string)[0:1])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.True(t, keyTTL < 5)
+ assert.True(t, key2TTL < 5)
+
+ time.Sleep(time.Second * 4)
+
+ // ensure that storage is clean
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
new file mode 100644
index 00000000..f17754e6
--- /dev/null
+++ b/plugins/kv/interface.go
@@ -0,0 +1,45 @@
+package kv
+
+// Item represents general storage item
+import (
+ "context"
+)
+
+type Item struct {
+ // Key of item
+ Key string
+ // Value of item
+ Value string
+ // live until time provided by TTL in RFC 3339 format
+ TTL string
+}
+
+// Storage represents single abstract storage.
+type Storage interface {
+ // Has checks if value exists.
+ Has(ctx context.Context, keys ...string) (map[string]bool, error)
+
+ // Get loads value content into a byte slice.
+ Get(ctx context.Context, key string) ([]byte, error)
+
+ // MGet loads content of multiple values
+ // If there are no values for keys, key will no be in the map
+ MGet(ctx context.Context, keys ...string) (map[string]interface{}, error)
+
+ // Set used to upload item to KV with TTL
+ // 0 value in TTL means no TTL
+ Set(ctx context.Context, items ...Item) error
+
+ // MExpire sets the TTL for multiply keys
+ MExpire(ctx context.Context, items ...Item) error
+
+ // TTL return the rest time to live for provided keys
+ // Not supported for the memcached and boltdb
+ TTL(ctx context.Context, keys ...string) (map[string]interface{}, error)
+
+ // Delete one or multiple keys.
+ Delete(ctx context.Context, keys ...string) error
+
+ // Close closes the storage and underlying resources.
+ Close() error
+} \ No newline at end of file
diff --git a/plugins/kv/memcached/config.go b/plugins/kv/memcached/config.go
new file mode 100644
index 00000000..62f29ef2
--- /dev/null
+++ b/plugins/kv/memcached/config.go
@@ -0,0 +1,10 @@
+package memcached
+
+type Config struct {
+ // Addr is url for memcached, 11211 port is used by default
+ Addr []string
+}
+
+func (s *Config) InitDefaults() {
+ s.Addr = []string{"localhost:11211"} // default url for memcached // init logger
+}
diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go
new file mode 100644
index 00000000..bd0a207d
--- /dev/null
+++ b/plugins/kv/memcached/plugin.go
@@ -0,0 +1,243 @@
+package memcached
+
+import (
+ "context"
+ "strings"
+ "time"
+
+ "github.com/bradfitz/gomemcache/memcache"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "memcached"
+
+var EmptyItem = kv.Item{}
+
+type Plugin struct {
+ // config
+ cfg *Config
+ // logger
+ log logger.Logger
+ // memcached client
+ client *memcache.Client
+}
+
+// NewMemcachedClient returns a memcache client using the provided server(s)
+// with equal weight. If a server is listed multiple times,
+// it gets a proportional amount of weight.
+func NewMemcachedClient(url string) kv.Storage {
+ m := memcache.New(url)
+ return &Plugin{
+ client: m,
+ }
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ const op = errors.Op("memcached init")
+ s.cfg = &Config{}
+ s.cfg.InitDefaults()
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ s.log = log
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ s.client = memcache.New(s.cfg.Addr...)
+ return errCh
+}
+
+// Memcached has no stop/close or smt similar to close the connection
+func (s *Plugin) Stop() error {
+ return nil
+}
+
+// Has checks the key for existence
+func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+ const op = errors.Op("memcached Has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+ m := make(map[string]bool, len(keys))
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ exist, err := s.client.Get(key)
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil && err != memcache.ErrCacheMiss {
+ return nil, err
+ }
+ if exist != nil {
+ m[key] = true
+ }
+ }
+ return m, nil
+}
+
+// Get gets the item for the given key. ErrCacheMiss is returned for a
+// memcache cache miss. The key must be at most 250 bytes in length.
+func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+ const op = errors.Op("memcached Get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ data, err := s.client.Get(key)
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil && err != memcache.ErrCacheMiss {
+ return nil, err
+ }
+ if data != nil {
+ // return the value by the key
+ return data.Value, nil
+ }
+ // data is nil by some reason and error also nil
+ return nil, nil
+}
+
+// return map with key -- string
+// and map value as value -- []byte
+func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("memcached MGet")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+ for _, key := range keys {
+ // Here also MultiGet
+ data, err := s.client.Get(key)
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil && err != memcache.ErrCacheMiss {
+ return nil, err
+ }
+ if data != nil {
+ m[key] = data.Value
+ }
+ }
+
+ return m, nil
+}
+
+// Set sets the KV pairs. Keys should be 250 bytes maximum
+// TTL:
+// Expiration is the cache expiration time, in seconds: either a relative
+// time from now (up to 1 month), or an absolute Unix epoch time.
+// Zero means the Item has no expiration time.
+func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+ const op = errors.Op("memcached Set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ for i := range items {
+ if items[i] == EmptyItem {
+ return errors.E(op, errors.EmptyItem)
+ }
+
+ // pre-allocate item
+ memcachedItem := &memcache.Item{
+ Key: items[i].Key,
+ // unsafe convert
+ Value: []byte(items[i].Value),
+ Flags: 0,
+ }
+
+ // add additional TTL in case of TTL isn't empty
+ if items[i].TTL != "" {
+ // verify the TTL
+ t, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return err
+ }
+ memcachedItem.Expiration = int32(t.Unix())
+ }
+
+ err := s.client.Set(memcachedItem)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// Expiration is the cache expiration time, in seconds: either a relative
+// time from now (up to 1 month), or an absolute Unix epoch time.
+// Zero means the Item has no expiration time.
+func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+ const op = errors.Op("memcached MExpire")
+ for _, item := range items {
+ if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // verify provided TTL
+ t, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+
+ // Touch updates the expiry for the given key. The seconds parameter is either
+ // a Unix timestamp or, if seconds is less than 1 month, the number of seconds
+ // into the future at which time the item will expire. Zero means the item has
+ // no expiration time. ErrCacheMiss is returned if the key is not in the cache.
+ // The key must be at most 250 bytes in length.
+ err = s.client.Touch(item.Key, int32(t.Unix()))
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// return time in seconds (int32) for a given keys
+func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("memcached HTTLas")
+ return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
+}
+
+func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+ const op = errors.Op("memcached Has")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ for _, key := range keys {
+ err := s.client.Delete(key)
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil && err != memcache.ErrCacheMiss {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s Plugin) Close() error {
+ return nil
+}
diff --git a/plugins/kv/memcached/storage_test.go b/plugins/kv/memcached/storage_test.go
new file mode 100644
index 00000000..4b59bbd0
--- /dev/null
+++ b/plugins/kv/memcached/storage_test.go
@@ -0,0 +1,444 @@
+package memcached
+
+import (
+ "context"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/stretchr/testify/assert"
+)
+
+func initStorage() kv.Storage {
+ return NewMemcachedClient("localhost:11211")
+}
+
+func cleanup(t *testing.T, s kv.Storage, keys ...string) {
+ err := s.Delete(context.Background(), keys...)
+ if err != nil {
+ t.Fatalf("error during cleanup: %s", err.Error())
+ }
+}
+
+func TestStorage_Has(t *testing.T) {
+ s := initStorage()
+
+ ctx := context.Background()
+
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+}
+
+func TestStorage_Has_Set_Has(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+}
+
+func TestStorage_Has_Set_MGet(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_Has_Set_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.Get(ctx, "key")
+ assert.NoError(t, err)
+
+ if string(res) != "hello world" {
+ t.Fatal("wrong value by key")
+ }
+}
+
+func TestStorage_Set_Del_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ // check that keys are present
+ res, err := s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+
+ assert.NoError(t, s.Delete(ctx, "key", "key2"))
+ // check that keys are not present
+ res, err = s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 0)
+}
+
+func TestStorage_Set_GetM(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+
+ defer func() {
+ cleanup(t, s, "key", "key2")
+
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ res, err := s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_MExpire_TTL(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+ // set timeout to 5 sec
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ i1 := kv.Item{
+ Key: "key",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ i2 := kv.Item{
+ Key: "key2",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ assert.NoError(t, s.MExpire(ctx, i1, i2))
+
+ time.Sleep(time.Second * 6)
+
+ // ensure that storage is clean
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestNilAndWrongArgs(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+ defer func() {
+ cleanup(t, s, "key")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ // check
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ _, err = s.Has(ctx, "")
+ assert.Error(t, err)
+
+ _, err = s.Get(ctx, "")
+ assert.Error(t, err)
+
+ _, err = s.Get(ctx, " ")
+ assert.Error(t, err)
+
+ _, err = s.Get(ctx, " ")
+ assert.Error(t, err)
+
+ _, err = s.MGet(ctx, "key", "key2", "")
+ assert.Error(t, err)
+
+ _, err = s.MGet(ctx, "key", "key2", " ")
+ assert.Error(t, err)
+
+ assert.Error(t, s.Set(ctx, kv.Item{}))
+
+ err = s.Delete(ctx, "")
+ assert.Error(t, err)
+
+ err = s.Delete(ctx, "key", "")
+ assert.Error(t, err)
+
+ err = s.Delete(ctx, "key", " ")
+ assert.Error(t, err)
+
+ err = s.Delete(ctx, "key")
+ assert.NoError(t, err)
+}
+
+func TestStorage_SetExpire_TTL(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ // set timeout to 5 sec
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: nowPlusFive,
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: nowPlusFive,
+ }))
+
+ time.Sleep(time.Second * 6)
+
+ // ensure that storage is clean
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestConcurrentReadWriteTransactions(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ wg := &sync.WaitGroup{}
+ wg.Add(3)
+
+ m := &sync.RWMutex{}
+ // concurrently set the keys
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ // set is writable transaction
+ // it should stop readable
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }, kv.Item{
+ Key: "key2" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }))
+ m.Unlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.RLock()
+ v, err = s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ m.RUnlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ err = s.Delete(ctx, "key"+strconv.Itoa(i))
+ assert.NoError(t, err)
+ m.Unlock()
+ }
+ }(s)
+
+ wg.Wait()
+}
diff --git a/plugins/kv/memory/config.go b/plugins/kv/memory/config.go
new file mode 100644
index 00000000..329e7fff
--- /dev/null
+++ b/plugins/kv/memory/config.go
@@ -0,0 +1,12 @@
+package memory
+
+// Config is default config for the in-memory driver
+type Config struct {
+ // Enabled or disabled (true or false)
+ Enabled bool
+}
+
+// InitDefaults by default driver is turned off
+func (c *Config) InitDefaults() {
+ c.Enabled = false
+}
diff --git a/plugins/kv/memory/storage.go b/plugins/kv/memory/storage.go
new file mode 100644
index 00000000..1b6cb580
--- /dev/null
+++ b/plugins/kv/memory/storage.go
@@ -0,0 +1,263 @@
+package memory
+
+import (
+ "context"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "memory"
+
+type Plugin struct {
+ heap *sync.Map
+ stop chan struct{}
+
+ log logger.Logger
+ cfg *Config
+}
+
+func NewInMemoryStorage() kv.Storage {
+ p := &Plugin{
+ heap: &sync.Map{},
+ stop: make(chan struct{}),
+ }
+
+ go p.gc()
+
+ return p
+}
+
+func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("in-memory storage init")
+ s.cfg = &Config{}
+ s.cfg.InitDefaults()
+
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ s.log = log
+ // init in-memory
+ s.heap = &sync.Map{}
+ s.stop = make(chan struct{}, 1)
+ return nil
+}
+
+func (s Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ // start in-memory gc for kv
+ go s.gc()
+
+ return errCh
+}
+
+func (s Plugin) Stop() error {
+ const op = errors.Op("in-memory storage stop")
+ err := s.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+ const op = errors.Op("in-memory storage Has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+ m := make(map[string]bool)
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ if _, ok := s.heap.Load(key); ok {
+ m[key] = true
+ }
+ }
+
+ return m, nil
+}
+
+func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+ const op = errors.Op("in-memory storage Get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ if data, exist := s.heap.Load(key); exist {
+ // here might be a panic
+ // but data only could be a string, see Set function
+ return []byte(data.(kv.Item).Value), nil
+ }
+ return nil, nil
+}
+
+func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("in-memory storage MGet")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, key := range keys {
+ if value, ok := s.heap.Load(key); ok {
+ m[key] = value
+ }
+ }
+
+ return m, nil
+}
+
+func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+ const op = errors.Op("in-memory storage Set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ for _, item := range items {
+ // TTL is set
+ if item.TTL != "" {
+ // check the TTL in the item
+ _, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+ }
+
+ s.heap.Store(item.Key, item)
+ }
+ return nil
+}
+
+// MExpire sets the expiration time to the key
+// If key already has the expiration time, it will be overwritten
+func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+ const op = errors.Op("in-memory storage MExpire")
+ for _, item := range items {
+ if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // if key exist, overwrite it value
+ if pItem, ok := s.heap.Load(item.Key); ok {
+ // check that time is correct
+ _, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ tmp := pItem.(kv.Item)
+ // guess that t is in the future
+ // in memory is just FOR TESTING PURPOSES
+ // LOGIC ISN'T IDEAL
+ s.heap.Store(item.Key, kv.Item{
+ Key: item.Key,
+ Value: tmp.Value,
+ TTL: item.TTL,
+ })
+ }
+ }
+
+ return nil
+}
+
+func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("in-memory storage TTL")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, key := range keys {
+ if item, ok := s.heap.Load(key); ok {
+ m[key] = item.(kv.Item).TTL
+ }
+ }
+ return m, nil
+}
+
+func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+ const op = errors.Op("in-memory storage Delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ for _, key := range keys {
+ s.heap.Delete(key)
+ }
+ return nil
+}
+
+// Close clears the in-memory storage
+func (s Plugin) Close() error {
+ s.heap = &sync.Map{}
+ s.stop <- struct{}{}
+ return nil
+}
+
+// ================================== PRIVATE ======================================
+
+func (s *Plugin) gc() {
+ // TODO check
+ ticker := time.NewTicker(time.Millisecond * 500)
+ for {
+ select {
+ case <-s.stop:
+ ticker.Stop()
+ return
+ case now := <-ticker.C:
+ // check every second
+ s.heap.Range(func(key, value interface{}) bool {
+ v := value.(kv.Item)
+ if v.TTL == "" {
+ return true
+ }
+
+ t, err := time.Parse(time.RFC3339, v.TTL)
+ if err != nil {
+ return false
+ }
+
+ if now.After(t) {
+ s.heap.Delete(key)
+ }
+ return true
+ })
+ }
+ }
+}
diff --git a/plugins/kv/memory/storage_test.go b/plugins/kv/memory/storage_test.go
new file mode 100644
index 00000000..b7b46637
--- /dev/null
+++ b/plugins/kv/memory/storage_test.go
@@ -0,0 +1,470 @@
+package memory
+
+import (
+ "context"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/stretchr/testify/assert"
+)
+
+func initStorage() kv.Storage {
+ return NewInMemoryStorage()
+}
+
+func cleanup(t *testing.T, s kv.Storage, keys ...string) {
+ err := s.Delete(context.Background(), keys...)
+ if err != nil {
+ t.Fatalf("error during cleanup: %s", err.Error())
+ }
+}
+
+func TestStorage_Has(t *testing.T) {
+ s := initStorage()
+
+ ctx := context.Background()
+
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+}
+
+func TestStorage_Has_Set_Has(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+}
+
+func TestStorage_Has_Set_MGet(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_Has_Set_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.Get(ctx, "key")
+ assert.NoError(t, err)
+
+ if string(res) != "value" {
+ t.Fatal("wrong value by key")
+ }
+}
+
+func TestStorage_Set_Del_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ // check that keys are present
+ res, err := s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+
+ assert.NoError(t, s.Delete(ctx, "key", "key2"))
+ // check that keys are not presentps -eo state,uid,pid,ppid,rtprio,time,comm
+ res, err = s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 0)
+}
+
+func TestStorage_Set_GetM(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+
+ defer func() {
+ cleanup(t, s, "key", "key2")
+
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ res, err := s.MGet(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_MExpire_TTL(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+ // set timeout to 5 sec
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ i1 := kv.Item{
+ Key: "key",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ i2 := kv.Item{
+ Key: "key2",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ assert.NoError(t, s.MExpire(ctx, i1, i2))
+
+ time.Sleep(time.Second * 6)
+
+ // ensure that storage is clean
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestNilAndWrongArgs(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ // check
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ _, err = s.Has(ctx, "")
+ assert.Error(t, err)
+
+ _, err = s.Get(ctx, "")
+ assert.Error(t, err)
+
+ _, err = s.Get(ctx, " ")
+ assert.Error(t, err)
+
+ _, err = s.Get(ctx, " ")
+ assert.Error(t, err)
+
+ _, err = s.MGet(ctx, "key", "key2", "")
+ assert.Error(t, err)
+
+ _, err = s.MGet(ctx, "key", "key2", " ")
+ assert.Error(t, err)
+
+ assert.NoError(t, s.Set(ctx, kv.Item{}))
+ _, err = s.Has(ctx, "key")
+ assert.NoError(t, err)
+
+ err = s.Delete(ctx, "")
+ assert.Error(t, err)
+
+ err = s.Delete(ctx, "key", "")
+ assert.Error(t, err)
+
+ err = s.Delete(ctx, "key", " ")
+ assert.Error(t, err)
+
+ err = s.Delete(ctx, "key")
+ assert.NoError(t, err)
+}
+
+func TestStorage_SetExpire_TTL(t *testing.T) {
+ s := initStorage()
+ ctx := context.Background()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ // set timeout to 5 sec
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: nowPlusFive,
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: nowPlusFive,
+ }))
+
+ time.Sleep(time.Second * 2)
+ m, err := s.TTL(ctx, "key", "key2")
+ assert.NoError(t, err)
+
+ // remove a precision 4.02342342 -> 4
+ keyTTL, err := strconv.Atoi(m["key"].(string)[0:1])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // remove a precision 4.02342342 -> 4
+ key2TTL, err := strconv.Atoi(m["key"].(string)[0:1])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.True(t, keyTTL < 5)
+ assert.True(t, key2TTL < 5)
+
+ time.Sleep(time.Second * 4)
+
+ // ensure that storage is clean
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestConcurrentReadWriteTransactions(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ ctx := context.Background()
+ v, err := s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has(ctx, "key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ wg := &sync.WaitGroup{}
+ wg.Add(3)
+
+ m := &sync.RWMutex{}
+ // concurrently set the keys
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ // set is writable transaction
+ // it should stop readable
+ assert.NoError(t, s.Set(ctx, kv.Item{
+ Key: "key" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }, kv.Item{
+ Key: "key2" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }))
+ m.Unlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.RLock()
+ v, err = s.Has(ctx, "key")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ m.RUnlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ err = s.Delete(ctx, "key"+strconv.Itoa(i))
+ assert.NoError(t, err)
+ m.Unlock()
+ }
+ }(s)
+
+ wg.Wait()
+}
diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go
index 452e03a3..eb1b61b2 100644
--- a/plugins/reload/plugin.go
+++ b/plugins/reload/plugin.go
@@ -57,7 +57,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res resetter.Res
return nil
}
}
- return errors.E(op, errors.Skip)
+ return errors.E(op, errors.SkipFile)
},
Files: make(map[string]os.FileInfo),
Ignored: ignored,
diff --git a/plugins/reload/watcher.go b/plugins/reload/watcher.go
index c232f16f..08c85af9 100644
--- a/plugins/reload/watcher.go
+++ b/plugins/reload/watcher.go
@@ -179,7 +179,7 @@ outer:
// if filename does not contain pattern --> ignore that file
if w.watcherConfigs[serviceName].FilePatterns != nil && w.watcherConfigs[serviceName].FilterHooks != nil {
err = w.watcherConfigs[serviceName].FilterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].FilePatterns)
- if errors.Is(errors.Skip, err) {
+ if errors.Is(errors.SkipFile, err) {
continue outer
}
}
@@ -293,7 +293,7 @@ func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]o
// if filename does not contain pattern --> ignore that file
err = w.watcherConfigs[serviceName].FilterHooks(info.Name(), w.watcherConfigs[serviceName].FilePatterns)
- if errors.Is(errors.Skip, err) {
+ if errors.Is(errors.SkipFile, err) {
return nil
}
diff --git a/tests/plugins/http/plugin_middleware.go b/tests/plugins/http/plugin_middleware.go
index 8d02524d..00640b69 100644
--- a/tests/plugins/http/plugin_middleware.go
+++ b/tests/plugins/http/plugin_middleware.go
@@ -6,15 +6,18 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
)
+// PluginMiddleware test
type PluginMiddleware struct {
config config.Configurer
}
+// Init test
func (p *PluginMiddleware) Init(cfg config.Configurer) error {
p.config = cfg
return nil
}
+// Middleware test
func (p *PluginMiddleware) Middleware(next http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/halt" {
@@ -29,19 +32,23 @@ func (p *PluginMiddleware) Middleware(next http.Handler) http.HandlerFunc {
}
}
+// Name test
func (p *PluginMiddleware) Name() string {
return "pluginMiddleware"
}
+// PluginMiddleware2 test
type PluginMiddleware2 struct {
config config.Configurer
}
+// Init test
func (p *PluginMiddleware2) Init(cfg config.Configurer) error {
p.config = cfg
return nil
}
+// Middleware test
func (p *PluginMiddleware2) Middleware(next http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/boom" {
@@ -56,6 +63,7 @@ func (p *PluginMiddleware2) Middleware(next http.Handler) http.HandlerFunc {
}
}
+// Name test
func (p *PluginMiddleware2) Name() string {
return "pluginMiddleware2"
}