summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/config/interface.go2
-rwxr-xr-xplugins/config/plugin.go2
-rw-r--r--plugins/kv/boltdb/plugin_unit_test.go531
-rw-r--r--plugins/kv/config.go5
-rw-r--r--plugins/kv/doc/kv.drawio1
-rw-r--r--plugins/kv/drivers/boltdb/config.go (renamed from plugins/kv/boltdb/config.go)0
-rw-r--r--plugins/kv/drivers/boltdb/driver.go (renamed from plugins/kv/boltdb/plugin.go)234
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go65
-rw-r--r--plugins/kv/drivers/memcached/config.go (renamed from plugins/kv/memcached/config.go)0
-rw-r--r--plugins/kv/drivers/memcached/driver.go (renamed from plugins/kv/memcached/plugin.go)95
-rw-r--r--plugins/kv/drivers/memcached/plugin.go44
-rw-r--r--plugins/kv/drivers/memory/config.go (renamed from plugins/kv/memory/config.go)0
-rw-r--r--plugins/kv/drivers/memory/driver.go (renamed from plugins/kv/memory/plugin.go)83
-rw-r--r--plugins/kv/drivers/memory/plugin.go64
-rw-r--r--plugins/kv/drivers/redis/config.go34
-rw-r--r--plugins/kv/drivers/redis/driver.go239
-rw-r--r--plugins/kv/drivers/redis/plugin.go51
-rw-r--r--plugins/kv/interface.go12
-rw-r--r--plugins/kv/memcached/plugin_unit_test.go432
-rw-r--r--plugins/kv/memory/plugin_unit_test.go472
-rw-r--r--plugins/kv/payload/generated/Item.go67
-rw-r--r--plugins/kv/payload/generated/Payload.go71
-rw-r--r--plugins/kv/payload/payload.fbs14
-rw-r--r--plugins/kv/rpc.go245
-rw-r--r--plugins/kv/storage.go182
-rw-r--r--plugins/redis/interface.go2
-rw-r--r--plugins/resetter/plugin.go2
-rw-r--r--plugins/rpc/interface.go2
-rw-r--r--plugins/rpc/plugin.go29
29 files changed, 1192 insertions, 1788 deletions
diff --git a/plugins/config/interface.go b/plugins/config/interface.go
index 8370c0ab..59ad981f 100644
--- a/plugins/config/interface.go
+++ b/plugins/config/interface.go
@@ -1,7 +1,7 @@
package config
type Configurer interface {
- // // UnmarshalKey takes a single key and unmarshals it into a Struct.
+ // UnmarshalKey takes a single key and unmarshals it into a Struct.
//
// func (h *HttpService) Init(cp config.Configurer) error {
// h.config := &HttpConfig{}
diff --git a/plugins/config/plugin.go b/plugins/config/plugin.go
index 09fd35bb..58647eb8 100755
--- a/plugins/config/plugin.go
+++ b/plugins/config/plugin.go
@@ -23,7 +23,7 @@ type Viper struct {
CommonConfig *General
}
-// Inits config provider.
+// Init config provider.
func (v *Viper) Init() error {
const op = errors.Op("config_plugin_init")
v.viper = viper.New()
diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/boltdb/plugin_unit_test.go
deleted file mode 100644
index ad3843e7..00000000
--- a/plugins/kv/boltdb/plugin_unit_test.go
+++ /dev/null
@@ -1,531 +0,0 @@
-package boltdb
-
-import (
- "os"
- "strconv"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/stretchr/testify/assert"
- bolt "go.etcd.io/bbolt"
- "go.uber.org/zap"
-)
-
-// NewBoltClient instantiate new BOLTDB client
-// The parameters are:
-// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created
-// perm os.FileMode -- file permissions, for example 0777
-// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other
-// bucket string -- name of the bucket to use, should be UTF-8
-func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) {
- const op = errors.Op("boltdb_plugin_new_bolt_client")
- db, err := bolt.Open(path, perm, options)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // bucket should be SET
- if bucket == "" {
- return nil, errors.E(op, errors.Str("bucket should be set"))
- }
-
- // create bucket if it does not exist
- // tx.Commit invokes via the db.Update
- err = db.Update(func(tx *bolt.Tx) error {
- _, err = tx.CreateBucketIfNotExists([]byte(bucket))
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- })
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // if TTL is not set, make it default
- if ttl == 0 {
- ttl = time.Minute
- }
-
- l, _ := zap.NewDevelopment()
- s := &Plugin{
- DB: db,
- bucket: []byte(bucket),
- stop: make(chan struct{}),
- timeout: ttl,
- gc: &sync.Map{},
- log: logger.NewZapAdapter(l),
- }
-
- // start the TTL gc
- go s.gcPhase()
-
- return s, nil
-}
-
-func initStorage() kv.Storage {
- storage, err := newBoltClient("rr.db", 0777, nil, "rr", time.Second)
- if err != nil {
- panic(err)
- }
- return storage
-}
-
-func cleanup(t *testing.T, path string) {
- err := os.RemoveAll(path)
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func TestStorage_Has(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-}
-
-func TestStorage_Has_Set_Has(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-}
-
-func TestConcurrentReadWriteTransactions(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- wg := &sync.WaitGroup{}
- wg.Add(3)
-
- m := &sync.RWMutex{}
- // concurrently set the keys
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- // set is writable transaction
- // it should stop readable
- assert.NoError(t, s.Set(kv.Item{
- Key: "key" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }, kv.Item{
- Key: "key2" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }))
- m.Unlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.RLock()
- v, err = s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- m.RUnlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- err = s.Delete("key" + strconv.Itoa(i))
- assert.NoError(t, err)
- m.Unlock()
- }
- }(s)
-
- wg.Wait()
-}
-
-func TestStorage_Has_Set_MGet(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestStorage_Has_Set_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world2",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
-
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.Get("key")
- assert.NoError(t, err)
-
- if string(res) != "hello world" {
- t.Fatal("wrong value by key")
- }
-}
-
-func TestStorage_Set_Del_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- // check that keys are present
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-
- assert.NoError(t, s.Delete("key", "key2"))
- // check that keys are not present
- res, err = s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 0)
-}
-
-func TestStorage_Set_GetM(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestNilAndWrongArgs(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- // check
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- _, err = s.Has("")
- assert.Error(t, err)
-
- _, err = s.Get("")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", "")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", " ")
- assert.Error(t, err)
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }))
-
- assert.Error(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "asdf",
- }))
-
- _, err = s.Has("key")
- assert.NoError(t, err)
-
- assert.Error(t, s.Set(kv.Item{}))
-
- err = s.Delete("")
- assert.Error(t, err)
-
- err = s.Delete("key", "")
- assert.Error(t, err)
-
- err = s.Delete("key", " ")
- assert.Error(t, err)
-
- err = s.Delete("key")
- assert.NoError(t, err)
-}
-
-func TestStorage_MExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
- // set timeout to 5 sec
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- i1 := kv.Item{
- Key: "key",
- Value: "",
- TTL: nowPlusFive,
- }
- i2 := kv.Item{
- Key: "key2",
- Value: "",
- TTL: nowPlusFive,
- }
- assert.NoError(t, s.MExpire(i1, i2))
-
- time.Sleep(time.Second * 7)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
-
-func TestStorage_SetExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- cleanup(t, "rr.db")
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- // set timeout to 5 sec
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: nowPlusFive,
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: nowPlusFive,
- }))
-
- time.Sleep(time.Second * 2)
- m, err := s.TTL("key", "key2")
- assert.NoError(t, err)
-
- // remove a precision 4.02342342 -> 4
- keyTTL, err := strconv.Atoi(m["key"].(string)[0:1])
- if err != nil {
- t.Fatal(err)
- }
-
- // remove a precision 4.02342342 -> 4
- key2TTL, err := strconv.Atoi(m["key"].(string)[0:1])
- if err != nil {
- t.Fatal(err)
- }
-
- assert.True(t, keyTTL < 5)
- assert.True(t, key2TTL < 5)
-
- time.Sleep(time.Second * 7)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
diff --git a/plugins/kv/config.go b/plugins/kv/config.go
new file mode 100644
index 00000000..9ecae644
--- /dev/null
+++ b/plugins/kv/config.go
@@ -0,0 +1,5 @@
+package kv
+
+type Config struct {
+ Data map[string]interface{} `mapstructure:"kv"`
+}
diff --git a/plugins/kv/doc/kv.drawio b/plugins/kv/doc/kv.drawio
new file mode 100644
index 00000000..04470e4a
--- /dev/null
+++ b/plugins/kv/doc/kv.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-04-22T21:31:28.320Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.5 Safari/537.36" etag="PMNN2QoTRBeugwC1WCGf" version="14.5.1" type="device"><diagram id="2us8W0xnLog_cmX3fgYy" name="Page-1">7V1Zc6O4Fv41rp55sAsQi3lMHKeTmaQ7lXRmpp9uySDbTLDlBjmJ59dficWAJO8GvLmrYyMkAeccfTqLdGiAzujzawAnw0fsIr+hKe5nA9w0NE1TTI1+sZJZXKJqqhGXDALPTcqyghfvP5QUKknp1HNRWKhIMPaJNykWOng8Rg4plMEgwB/Fan3sF686gQMkFLw40BdL//ZcMoxLdbOdO3GHvMEwuTQASnLnI5jWTgrCIXTxR64IdBugE2BM4l+jzw7yGflSwsTtbhecnd9AgMZknQb62+30l3L1+b8/vk0ME0B/YipNI+kmJLP0kZFLKZAc4oAM8QCPod/NSq8DPB27iHWr0KOszgPGE1qo0sJ/ESGzhJ1wSjAtGpKRn5xFnx75J/f7J+uqZSRHN59Jz9HBLD0Yk2D2T1aRHf5M+2AHWbPoKG0XEhiQKyYKtGCMxygtu/V8f97CTWs4PgxDz4kLkyrsEn08JsnzqDo7pqc62MdBRDRgKtBWrajrAL+h3BntxjIVdpGY0Iy6CxmYMgNPAyep9T57/HDIt/G/Duh13bfO+633q5mOBBgMEFnCXXUuZnSEIjxClDC0XYB8SLz34n3AZKQM5vUyWaI/EnHaRLS0OkUrE6efeWmSitaZiIi0nrGjiERNKWXgLFdhgr0xCXM9P7ECWiGZGFTVTkAxmRfaBoddfAPQXtqA/ojvIRPX+cPsIMHgIsGHL8HgICTYAhtKMNdgZwleRsN36E8TQvz5Fz1+enj9ev9NEO6i6H4MPYJeJjBi0gfV74piysmGC1G778hkw3TaqNeXSNdC0XhHAUGfuSKRu3M26EWq2gkuKB+ZtqamKtkwp6il7fY+7bVrxYxtFKqm0lKUuRoVN1WNuZK1QK+Kjp5Q4FGyoUDEmASFMoARcCpDoTxUqftFEbCmqrQriuwkM6p9fFp4UV5WSUsJE1T5giKH1HZFkrLsLnOAfv394cfNdTmgjlTXQNIJ3zYtAM2SQB1oRVA3dQmoaxJQN8sC9VrH5/agDrhBatnbgPqaoL0K+/c4Vo2KQH0r1TA1TeaKnr1cM+Trq5pulW/bqOax2DY5xw+w9YLrp6Xoq9w/qyX6IGejdSWcYm6NeotWq4vnordsIik5n3MNeosh6C2P3cfOVeeue3PSqotl1q266LV6+JWWtgXSc8aoVv0oZUzmRUrtQRVpMpFSFLN7dbvfcW2vq+NUNa7l0mUKA1s+JzzAHvKLMgJ9bzBm7KC0YVPzNRtnngP9q+TEyHPdWBxR6P0He1F/jJeJCkY7N64bxo2EdQsG7jxQmHTWmEfn8ixaMo4Wjnyqb6sAgMLg1+OjzdRNQT8ESss0Cv02Nb2lmMV+cL8fUjnh+bkHTVGv1QtenOStbWb5A1ERjwhSrDrsJg1whlCbD2ivbFBFUEi3BcTTTxTxFpjPGeIZtm0WkWk/kGe3Uq0ltYmNlm2VAHjLhkmOwc9PtCfl+vVlryrqGuE0xuRceT/6lKO6agqvutprqq6lhVKMw5l4yrEu94jtqsQOXBwSrMsMVEU7sBUErRmk5NdMn973dS8osNz8NWULpCKJb4YR065oBVWffEYETM/TXwMyJ2lOZDYbk3R4aY40jOmaPdMoyWy0zTWimGUNvSVRjgKj9op9/baD5HTutQ3dUPZDV9VSDi2SkC5sPKJQgiQ8bLWPL5KweIXcwUYSOOm1VyjEfH0VJMO43EiCVatEbxdJ2GgJ6eHHEHaSbXXXNYC7xRAOyL1wBjGEnSRl0XRb0SoZQSV57t7cvxxY/EBigQssWT9+0F7XCCsvfnA06E4PxPVo2yH+HsfW2uuKakVhcV1Rao+53vtaBplBDTKJNRZ3wirTSn3oFDvpwNCBrgfzLbe91tt7dDa+ICVCfM3ifdDi6HmKpUfziPQbjhhujXvhJO6ph33i9ppUDBk8nN/zS4rcgA6XIO4nJs+FKpQqXkKSrAOCQmZ+3AZBk9Gp0PPZ04vqA4gnWBC0ImG6kCnDn6nzxuY1nlAXKuWLJigYeVSrx+Mw7syyrAtdqFZE/LgTXQlPnh5z97bwYKf5vDyzR2jkQGeI3OY0vKgqgqoyJ8+FMNTAcd1UXTGuC1OLjx3oD3HI5hxV1VS1cFMsNn7q5Dt3HAmQ610ARASQmC4XokTgcRGQhUVN+v9LHkZNYNlf6iOWFMoKF+MckOEQTlDkSyfM27jCsduDztsg8kh+nxLfG6Ok3IXB23fayiOR16+lMA9hukLLR/3oObiotcH+NWSbnKNPQ1ixE3/mD7GTc9hWistAVUWyTEADEucwKG2Jjipf7VuLezg9s7t7OHMIrxHR2T6OXWZEp6rsCbtJjxjSWYUsTjy2ovlu0PuN5S9ia/Oy799jkOMRSF24XqgUxF5yuUP21Z0Fdav27J0FUSv2A54FTUv3Gp4FFXfwMdand0o1g92WFhhqcWnB/Di/tECmPdqlKY9i1PkZOYipHfTJ/OnAG3+hLFPwx7gR7WPoe4P4mYnAG0oY0ihtswVTQcOEHyWxBwCOPbKVH5aEPVpp7NH3oZ0pGaYoGaxoymJk2QusxC1b9HPWY14vmoxmWxQqU2YwGmXJlLgB4CJTxy1T7ZSndclUCoC171XYja78XoX6UwdoMvP8+Alb/5pKTZxZnQBBwvQeijjKGH005r79g9F09H0BCOfHtKw1FdHSNB1NnJWW8INOMOwvgs4wp5aeGpd0g1Md1nU2l8clMdfEOY0aznyzldpHjXUa0wNHWJmgVzs9gKPZEnP4W1u0ZCPrykCILZeSijZBidtt47yezZfvrz/u6Jmb5/u/us9Vj68ygMzis3vWr+eKGR9OAcikelW1QFbr5qHL3r4sP9ihA6AujrjDTzKVS0xXz5totCpTStUrXVttoFdtzqneTiy9hW9pEBqseDGJptSSg0pUF+Qq4/HnoNKWznl0CFoqlxwvocSOKahUrWVxOajMlq0Xuykv6V4qhrKEqU2Wh4pfOne6+qEliVlUrMaciB90HQ9CtYQ9wgxftb99bt/6oXYM+mF6l5LkD6eLfLZshUa1A1R0uT52n792adHVwwP9++OuO2cBY8bf9z/uWCYO5fXb/e19lNx7cfLE2nyy5p4Y1rY4V4bEdajLGFaaTxYc0OunAPdZDrDLlmfXnw9claBExZaWVRFOb2VpaQaXwjDNRVaqHQREx93be+sOhtJBcPTGEFjw6qlcCnLD4BLyJvPmjtZQUwXtFtDs7NMuXibNF1lBOnIxdi5/w8EZsNsCls3xYTd2V2DYikHD11ASr03XSE1H/pVDcJ5ZEWOf6NxOPMyY1sOE4JGEmwRzah6OF1B15i8Ll+3C4ZZQ7UFPEMBRllpZFrotL6mX6F54gjMfQze3bXKDTLzCIjm1RU+9UL6x96tzW/+FJdq8L0Njje8JGtGv397QLF7iF99r9JMQ//cNtMmEkwcd36fqBCcjQKL8t6sUEkMUkmcUTvA4RBsQv1xVfjn0bkB+4Y3UsqikKiE/2Jz8bBsXZsMlA1ZqkQ4fscsmrO7/AQ==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/kv/boltdb/config.go b/plugins/kv/drivers/boltdb/config.go
index ebe73c25..ebe73c25 100644
--- a/plugins/kv/boltdb/config.go
+++ b/plugins/kv/drivers/boltdb/config.go
diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/drivers/boltdb/driver.go
index ffcbc85a..b596d4c3 100644
--- a/plugins/kv/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -16,23 +16,15 @@ import (
bolt "go.etcd.io/bbolt"
)
-const PluginName = "boltdb"
-
-// BoltDB K/V storage.
-type Plugin struct {
+type Driver struct {
// db instance
DB *bolt.DB
// name should be UTF-8
bucket []byte
-
- // config for RR integration
- cfg *Config
-
- // logger
- log logger.Logger
-
+ log logger.Logger
+ cfg *Config
// gc contains key which are contain timeouts
- gc *sync.Map
+ gc sync.Map
// default timeout for cache cleanup is 1 minute
timeout time.Duration
@@ -40,34 +32,38 @@ type Plugin struct {
stop chan struct{}
}
-func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- const op = errors.Op("boltdb_plugin_init")
+func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) {
+ const op = errors.Op("new_boltdb_driver")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
+ d := &Driver{
+ log: log,
+ stop: stop,
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfgPlugin.UnmarshalKey(key, &d.cfg)
if err != nil {
- return errors.E(op, errors.Disabled, err)
+ return nil, errors.E(op, err)
}
- // add default values
- s.cfg.InitDefaults()
+ d.bucket = []byte(d.cfg.Bucket)
+ d.timeout = time.Duration(d.cfg.Interval) * time.Second
+ d.gc = sync.Map{}
- // set the logger
- s.log = log
+ // add default values
+ d.cfg.InitDefaults()
- db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil)
+ db, err := bolt.Open(path.Join(d.cfg.Dir, d.cfg.File), os.FileMode(d.cfg.Permissions), nil)
if err != nil {
- return errors.E(op, err)
+ return nil, errors.E(op, err)
}
+ d.DB = db
+
// create bucket if it does not exist
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists([]byte(s.cfg.Bucket))
+ _, err = tx.CreateBucketIfNotExists([]byte(d.cfg.Bucket))
if err != nil {
return errors.E(op, upOp)
}
@@ -75,38 +71,17 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
})
if err != nil {
- return errors.E(op, err)
+ return nil, errors.E(op, err)
}
- s.DB = db
- s.bucket = []byte(s.cfg.Bucket)
- s.stop = make(chan struct{})
- s.timeout = time.Duration(s.cfg.Interval) * time.Second
- s.gc = &sync.Map{}
-
- return nil
-}
-
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
- // start the TTL gc
- go s.gcPhase()
-
- return errCh
-}
+ go d.startGCLoop()
-func (s *Plugin) Stop() error {
- const op = errors.Op("boltdb_plugin_stop")
- err := s.Close()
- if err != nil {
- return errors.E(op, err)
- }
- return nil
+ return d, nil
}
-func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
- const op = errors.Op("boltdb_plugin_has")
- s.log.Debug("boltdb HAS method called", "args", keys)
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("boltdb_driver_has")
+ d.log.Debug("boltdb HAS method called", "args", keys)
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
}
@@ -114,7 +89,7 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
m := make(map[string]bool, len(keys))
// this is readable transaction
- err := s.DB.View(func(tx *bolt.Tx) error {
+ err := d.DB.View(func(tx *bolt.Tx) error {
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
@@ -123,7 +98,7 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
if keyTrimmed == "" {
return errors.E(op, errors.EmptyKey)
}
- b := tx.Bucket(s.bucket)
+ b := tx.Bucket(d.bucket)
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
@@ -138,15 +113,15 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
return nil, errors.E(op, err)
}
- s.log.Debug("boltdb HAS method finished")
+ d.log.Debug("boltdb HAS method finished")
return m, nil
}
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
-func (s *Plugin) Get(key string) ([]byte, error) {
- const op = errors.Op("boltdb_plugin_get")
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("boltdb_driver_get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
if keyTrimmed == "" {
@@ -154,8 +129,8 @@ func (s *Plugin) Get(key string) ([]byte, error) {
}
var val []byte
- err := s.DB.View(func(tx *bolt.Tx) error {
- b := tx.Bucket(s.bucket)
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
@@ -185,8 +160,8 @@ func (s *Plugin) Get(key string) ([]byte, error) {
return val, nil
}
-func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("boltdb_plugin_mget")
+func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("boltdb_driver_mget")
// defense
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -202,8 +177,8 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
m := make(map[string]interface{}, len(keys))
- err := s.DB.View(func(tx *bolt.Tx) error {
- b := tx.Bucket(s.bucket)
+ err := d.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
@@ -237,14 +212,14 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
}
// Set puts the K/V to the bolt
-func (s *Plugin) Set(items ...kv.Item) error {
- const op = errors.Op("boltdb_plugin_set")
+func (d *Driver) Set(items ...kv.Item) error {
+ const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
// start writable transaction
- tx, err := s.DB.Begin(true)
+ tx, err := d.DB.Begin(true)
if err != nil {
return errors.E(op, err)
}
@@ -253,12 +228,12 @@ func (s *Plugin) Set(items ...kv.Item) error {
if err != nil {
errRb := tx.Rollback()
if errRb != nil {
- s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
}
}
}()
- b := tx.Bucket(s.bucket)
+ b := tx.Bucket(d.bucket)
// use access by index to avoid copying
for i := range items {
// performance note: pass a prepared bytes slice with initial cap
@@ -290,7 +265,7 @@ func (s *Plugin) Set(items ...kv.Item) error {
return errors.E(op, err)
}
// Store key TTL in the separate map
- s.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].TTL)
}
buf.Reset()
@@ -300,8 +275,8 @@ func (s *Plugin) Set(items ...kv.Item) error {
}
// Delete all keys from DB
-func (s *Plugin) Delete(keys ...string) error {
- const op = errors.Op("boltdb_plugin_delete")
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("boltdb_driver_delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
}
@@ -315,7 +290,7 @@ func (s *Plugin) Delete(keys ...string) error {
}
// start writable transaction
- tx, err := s.DB.Begin(true)
+ tx, err := d.DB.Begin(true)
if err != nil {
return errors.E(op, err)
}
@@ -325,12 +300,12 @@ func (s *Plugin) Delete(keys ...string) error {
if err != nil {
errRb := tx.Rollback()
if errRb != nil {
- s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
}
}
}()
- b := tx.Bucket(s.bucket)
+ b := tx.Bucket(d.bucket)
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
@@ -347,8 +322,8 @@ func (s *Plugin) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Plugin) MExpire(items ...kv.Item) error {
- const op = errors.Op("boltdb_plugin_mexpire")
+func (d *Driver) MExpire(items ...kv.Item) error {
+ const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
@@ -360,13 +335,13 @@ func (s *Plugin) MExpire(items ...kv.Item) error {
return errors.E(op, err)
}
- s.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].TTL)
}
return nil
}
-func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("boltdb_plugin_ttl")
+func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("boltdb_driver_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
}
@@ -382,7 +357,7 @@ func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
m := make(map[string]interface{}, len(keys))
for i := range keys {
- if item, ok := s.gc.Load(keys[i]); ok {
+ if item, ok := d.gc.Load(keys[i]); ok {
// a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64
m[keys[i]] = item.(string)
}
@@ -390,67 +365,56 @@ func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
return m, nil
}
-// Close the DB connection
-func (s *Plugin) Close() error {
- // stop the keys GC
- s.stop <- struct{}{}
- return s.DB.Close()
-}
-
-// RPCService returns associated rpc service.
-func (s *Plugin) RPC() interface{} {
- return kv.NewRPCServer(s, s.log)
-}
-
-// Name returns plugin name
-func (s *Plugin) Name() string {
- return PluginName
-}
-
// ========================= PRIVATE =================================
-func (s *Plugin) gcPhase() {
- t := time.NewTicker(s.timeout)
- defer t.Stop()
- for {
- select {
- case <-t.C:
- // calculate current time before loop started to be fair
- now := time.Now()
- s.gc.Range(func(key, value interface{}) bool {
- const op = errors.Op("boltdb_plugin_gc")
- k := key.(string)
- v, err := time.Parse(time.RFC3339, value.(string))
- if err != nil {
- return false
- }
+func (d *Driver) startGCLoop() { //nolint:gocognit
+ go func() {
+ t := time.NewTicker(d.timeout)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ // calculate current time before loop started to be fair
+ now := time.Now()
+ d.gc.Range(func(key, value interface{}) bool {
+ const op = errors.Op("boltdb_plugin_gc")
+ k := key.(string)
+ v, err := time.Parse(time.RFC3339, value.(string))
+ if err != nil {
+ return false
+ }
- if now.After(v) {
- // time expired
- s.gc.Delete(k)
- s.log.Debug("key deleted", "key", k)
- err := s.DB.Update(func(tx *bolt.Tx) error {
- b := tx.Bucket(s.bucket)
- if b == nil {
- return errors.E(op, errors.NoSuchBucket)
- }
- err := b.Delete([]byte(k))
+ if now.After(v) {
+ // time expired
+ d.gc.Delete(k)
+ d.log.Debug("key deleted", "key", k)
+ err := d.DB.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket(d.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ err := b.Delete([]byte(k))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ })
if err != nil {
- return errors.E(op, err)
+ d.log.Error("error during the gc phase of update", "error", err)
+ // todo this error is ignored, it means, that timer still be active
+ // to prevent this, we need to invoke t.Stop()
+ return false
}
- return nil
- })
- if err != nil {
- s.log.Error("error during the gc phase of update", "error", err)
- // todo this error is ignored, it means, that timer still be active
- // to prevent this, we need to invoke t.Stop()
- return false
}
+ return true
+ })
+ case <-d.stop:
+ err := d.DB.Close()
+ if err != nil {
+ d.log.Error("error")
}
- return true
- })
- case <-s.stop:
- return
+ return
+ }
}
- }
+ }()
}
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
new file mode 100644
index 00000000..9d1e0dba
--- /dev/null
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -0,0 +1,65 @@
+package boltdb
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "boltdb"
+
+// Plugin BoltDB K/V storage.
+type Plugin struct {
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+
+ drivers uint
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ if !cfg.Has(kv.PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ s.stop = make(chan struct{})
+ s.log = log
+ s.cfgPlugin = cfg
+ return nil
+}
+
+// Serve is noop here
+func (s *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (s *Plugin) Stop() error {
+ if s.drivers > 0 {
+ for i := uint(0); i < s.drivers; i++ {
+ // send close signal to every driver
+ s.stop <- struct{}{}
+ }
+ }
+ return nil
+}
+
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_provide")
+ st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save driver number to release resources after Stop
+ s.drivers++
+
+ return st, nil
+}
+
+// Name returns plugin name
+func (s *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/kv/memcached/config.go b/plugins/kv/drivers/memcached/config.go
index 7aad53b6..7aad53b6 100644
--- a/plugins/kv/memcached/config.go
+++ b/plugins/kv/drivers/memcached/config.go
diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/drivers/memcached/driver.go
index b8392f9e..17b06fa0 100644
--- a/plugins/kv/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -11,68 +11,37 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-const PluginName = "memcached"
-
-var EmptyItem = kv.Item{}
-
-type Plugin struct {
- // config
- cfg *Config
- // logger
- log logger.Logger
- // memcached client
+type Driver struct {
client *memcache.Client
+ log logger.Logger
+ cfg *Config
}
-// NewMemcachedClient returns a memcache client using the provided server(s)
+// NewMemcachedDriver returns a memcache client using the provided server(s)
// with equal weight. If a server is listed multiple times,
// it gets a proportional amount of weight.
-func NewMemcachedClient(url string) kv.Storage {
- m := memcache.New(url)
- return &Plugin{
- client: m,
- }
-}
+func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) {
+ const op = errors.Op("new_memcached_driver")
-func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- const op = errors.Op("memcached_plugin_init")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
+ s := &Driver{
+ log: log,
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+
+ err := cfgPlugin.UnmarshalKey(key, &s.cfg)
if err != nil {
- return errors.E(op, err)
+ return nil, errors.E(op, err)
}
s.cfg.InitDefaults()
- s.log = log
- return nil
-}
+ m := memcache.New(s.cfg.Addr...)
+ s.client = m
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
- s.client = memcache.New(s.cfg.Addr...)
- return errCh
-}
-
-// Memcached has no stop/close or smt similar to close the connection
-func (s *Plugin) Stop() error {
- return nil
-}
-
-// RPCService returns associated rpc service.
-func (s *Plugin) RPC() interface{} {
- return kv.NewRPCServer(s, s.log)
-}
-
-// Name returns plugin user-friendly name
-func (s *Plugin) Name() string {
- return PluginName
+ return s, nil
}
// Has checks the key for existence
-func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("memcached_plugin_has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -83,7 +52,7 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
if keyTrimmed == "" {
return nil, errors.E(op, errors.EmptyKey)
}
- exist, err := s.client.Get(keys[i])
+ exist, err := d.client.Get(keys[i])
if err != nil {
// ErrCacheMiss means that a Get failed because the item wasn't present.
@@ -101,14 +70,14 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
// Get gets the item for the given key. ErrCacheMiss is returned for a
// memcache cache miss. The key must be at most 250 bytes in length.
-func (s *Plugin) Get(key string) ([]byte, error) {
+func (d *Driver) Get(key string) ([]byte, error) {
const op = errors.Op("memcached_plugin_get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
if keyTrimmed == "" {
return nil, errors.E(op, errors.EmptyKey)
}
- data, err := s.client.Get(key)
+ data, err := d.client.Get(key)
if err != nil {
// ErrCacheMiss means that a Get failed because the item wasn't present.
if err == memcache.ErrCacheMiss {
@@ -124,9 +93,9 @@ func (s *Plugin) Get(key string) ([]byte, error) {
return nil, nil
}
-// return map with key -- string
+// MGet return map with key -- string
// and map value as value -- []byte
-func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("memcached_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -143,7 +112,7 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
m := make(map[string]interface{}, len(keys))
for i := range keys {
// Here also MultiGet
- data, err := s.client.Get(keys[i])
+ data, err := d.client.Get(keys[i])
if err != nil {
// ErrCacheMiss means that a Get failed because the item wasn't present.
if err == memcache.ErrCacheMiss {
@@ -164,7 +133,7 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (s *Plugin) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...kv.Item) error {
const op = errors.Op("memcached_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -193,7 +162,7 @@ func (s *Plugin) Set(items ...kv.Item) error {
memcachedItem.Expiration = int32(t.Unix())
}
- err := s.client.Set(memcachedItem)
+ err := d.client.Set(memcachedItem)
if err != nil {
return err
}
@@ -202,10 +171,10 @@ func (s *Plugin) Set(items ...kv.Item) error {
return nil
}
-// Expiration is the cache expiration time, in seconds: either a relative
+// MExpire Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (s *Plugin) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...kv.Item) error {
const op = errors.Op("memcached_plugin_mexpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
@@ -223,7 +192,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error {
// into the future at which time the item will expire. Zero means the item has
// no expiration time. ErrCacheMiss is returned if the key is not in the cache.
// The key must be at most 250 bytes in length.
- err = s.client.Touch(items[i].Key, int32(t.Unix()))
+ err = d.client.Touch(items[i].Key, int32(t.Unix()))
if err != nil {
return errors.E(op, err)
}
@@ -231,13 +200,13 @@ func (s *Plugin) MExpire(items ...kv.Item) error {
return nil
}
-// return time in seconds (int32) for a given keys
-func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
+// TTL return time in seconds (int32) for a given keys
+func (d *Driver) TTL(_ ...string) (map[string]interface{}, error) {
const op = errors.Op("memcached_plugin_ttl")
return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
}
-func (s *Plugin) Delete(keys ...string) error {
+func (d *Driver) Delete(keys ...string) error {
const op = errors.Op("memcached_plugin_has")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -252,7 +221,7 @@ func (s *Plugin) Delete(keys ...string) error {
}
for i := range keys {
- err := s.client.Delete(keys[i])
+ err := d.client.Delete(keys[i])
// ErrCacheMiss means that a Get failed because the item wasn't present.
if err != nil {
// ErrCacheMiss means that a Get failed because the item wasn't present.
@@ -264,7 +233,3 @@ func (s *Plugin) Delete(keys ...string) error {
}
return nil
}
-
-func (s *Plugin) Close() error {
- return nil
-}
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
new file mode 100644
index 00000000..af59e91b
--- /dev/null
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -0,0 +1,44 @@
+package memcached
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "memcached"
+
+var EmptyItem = kv.Item{}
+
+type Plugin struct {
+ // config plugin
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ if !cfg.Has(kv.PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ s.cfgPlugin = cfg
+ s.log = log
+ return nil
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_provide")
+ st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return st, nil
+}
diff --git a/plugins/kv/memory/config.go b/plugins/kv/drivers/memory/config.go
index e51d09c5..e51d09c5 100644
--- a/plugins/kv/memory/config.go
+++ b/plugins/kv/drivers/memory/config.go
diff --git a/plugins/kv/memory/plugin.go b/plugins/kv/drivers/memory/driver.go
index 4201a1c0..1e0d03d4 100644
--- a/plugins/kv/memory/plugin.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -11,53 +11,35 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-// PluginName is user friendly name for the plugin
-const PluginName = "memory"
-
-type Plugin struct {
- // heap is user map for the key-value pairs
+type Driver struct {
heap sync.Map
+ // stop is used to stop keys GC and close boltdb connection
stop chan struct{}
-
- log logger.Logger
- cfg *Config
+ log logger.Logger
+ cfg *Config
}
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("in_memory_plugin_init")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
+func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) {
+ const op = errors.Op("new_in_memory_driver")
+
+ d := &Driver{
+ stop: stop,
+ log: log,
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+
+ err := cfgPlugin.UnmarshalKey(key, &d.cfg)
if err != nil {
- return errors.E(op, err)
+ return nil, errors.E(op, err)
}
- s.cfg.InitDefaults()
- s.log = log
-
- s.stop = make(chan struct{}, 1)
- return nil
-}
+ d.cfg.InitDefaults()
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
- // start in-memory gc for kv
- go s.gc()
+ go d.gc()
- return errCh
+ return d, nil
}
-func (s *Plugin) Stop() error {
- const op = errors.Op("in_memory_plugin_stop")
- err := s.Close()
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
+func (s *Driver) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("in_memory_plugin_has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -77,7 +59,7 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
return m, nil
}
-func (s *Plugin) Get(key string) ([]byte, error) {
+func (s *Driver) Get(key string) ([]byte, error) {
const op = errors.Op("in_memory_plugin_get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -93,7 +75,7 @@ func (s *Plugin) Get(key string) ([]byte, error) {
return nil, nil
}
-func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
+func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("in_memory_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -118,7 +100,7 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
return m, nil
}
-func (s *Plugin) Set(items ...kv.Item) error {
+func (s *Driver) Set(items ...kv.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -141,7 +123,7 @@ func (s *Plugin) Set(items ...kv.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Plugin) MExpire(items ...kv.Item) error {
+func (s *Driver) MExpire(items ...kv.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
@@ -170,7 +152,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error {
return nil
}
-func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
+func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("in_memory_plugin_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -194,7 +176,7 @@ func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
return m, nil
}
-func (s *Plugin) Delete(keys ...string) error {
+func (s *Driver) Delete(keys ...string) error {
const op = errors.Op("in_memory_plugin_delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -214,26 +196,9 @@ func (s *Plugin) Delete(keys ...string) error {
return nil
}
-// Close clears the in-memory storage
-func (s *Plugin) Close() error {
- s.stop <- struct{}{}
- return nil
-}
-
-// RPCService returns associated rpc service.
-func (s *Plugin) RPC() interface{} {
- return kv.NewRPCServer(s, s.log)
-}
-
-// Name returns plugin user-friendly name
-func (s *Plugin) Name() string {
- return PluginName
-}
-
// ================================== PRIVATE ======================================
-func (s *Plugin) gc() {
- // TODO check
+func (s *Driver) gc() {
ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second)
for {
select {
diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go
new file mode 100644
index 00000000..acc6023d
--- /dev/null
+++ b/plugins/kv/drivers/memory/plugin.go
@@ -0,0 +1,64 @@
+package memory
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// PluginName is user friendly name for the plugin
+const PluginName = "memory"
+
+type Plugin struct {
+ // heap is user map for the key-value pairs
+ stop chan struct{}
+
+ log logger.Logger
+ cfgPlugin config.Configurer
+ drivers uint
+}
+
+func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("in_memory_plugin_init")
+ if !cfg.Has(kv.PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ s.log = log
+ s.cfgPlugin = cfg
+ s.stop = make(chan struct{}, 1)
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (s *Plugin) Stop() error {
+ if s.drivers > 0 {
+ for i := uint(0); i < s.drivers; i++ {
+ // send close signal to every driver
+ s.stop <- struct{}{}
+ }
+ }
+ return nil
+}
+
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("inmemory_plugin_provide")
+ st, err := NewInMemoryDriver(s.log, key, s.cfgPlugin, s.stop)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save driver number to release resources after Stop
+ s.drivers++
+
+ return st, nil
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/kv/drivers/redis/config.go b/plugins/kv/drivers/redis/config.go
new file mode 100644
index 00000000..41348236
--- /dev/null
+++ b/plugins/kv/drivers/redis/config.go
@@ -0,0 +1,34 @@
+package redis
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
new file mode 100644
index 00000000..d0b541b2
--- /dev/null
+++ b/plugins/kv/drivers/redis/driver.go
@@ -0,0 +1,239 @@
+package redis
+
+import (
+ "context"
+ "strings"
+ "time"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+var EmptyItem = kv.Item{}
+
+type Driver struct {
+ universalClient redis.UniversalClient
+ log logger.Logger
+ cfg *Config
+}
+
+func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) {
+ const op = errors.Op("new_boltdb_driver")
+
+ d := &Driver{
+ log: log,
+ }
+
+ // will be different for every connected driver
+ err := cfgPlugin.UnmarshalKey(key, &d.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.cfg.InitDefaults()
+ d.log = log
+
+ d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: d.cfg.Addrs,
+ DB: d.cfg.DB,
+ Username: d.cfg.Username,
+ Password: d.cfg.Password,
+ SentinelPassword: d.cfg.SentinelPassword,
+ MaxRetries: d.cfg.MaxRetries,
+ MinRetryBackoff: d.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: d.cfg.MaxRetryBackoff,
+ DialTimeout: d.cfg.DialTimeout,
+ ReadTimeout: d.cfg.ReadTimeout,
+ WriteTimeout: d.cfg.WriteTimeout,
+ PoolSize: d.cfg.PoolSize,
+ MinIdleConns: d.cfg.MinIdleConns,
+ MaxConnAge: d.cfg.MaxConnAge,
+ PoolTimeout: d.cfg.PoolTimeout,
+ IdleTimeout: d.cfg.IdleTimeout,
+ IdleCheckFrequency: d.cfg.IdleCheckFreq,
+ ReadOnly: d.cfg.ReadOnly,
+ RouteByLatency: d.cfg.RouteByLatency,
+ RouteRandomly: d.cfg.RouteRandomly,
+ MasterName: d.cfg.MasterName,
+ })
+
+ return d, nil
+}
+
+// Has checks if value exists.
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("redis_driver_has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ m := make(map[string]bool, len(keys))
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ exist, err := d.universalClient.Exists(context.Background(), key).Result()
+ if err != nil {
+ return nil, err
+ }
+ if exist == 1 {
+ m[key] = true
+ }
+ }
+ return m, nil
+}
+
+// Get loads key content into slice.
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("redis_driver_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ return d.universalClient.Get(context.Background(), key).Bytes()
+}
+
+// MGet loads content of multiple values (some values might be skipped).
+// https://redis.io/commands/mget
+// Returns slice with the interfaces with values
+func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("redis_driver_mget")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, k := range keys {
+ cmd := d.universalClient.Get(context.Background(), k)
+ if cmd.Err() != nil {
+ if cmd.Err() == redis.Nil {
+ continue
+ }
+ return nil, errors.E(op, cmd.Err())
+ }
+
+ m[k] = cmd.Val()
+ }
+
+ return m, nil
+}
+
+// Set sets value with the TTL in seconds
+// https://redis.io/commands/set
+// Redis `SET key value [expiration]` command.
+//
+// Use expiration for `SETEX`-like behavior.
+// Zero expiration means the key has no expiration time.
+func (d *Driver) Set(items ...kv.Item) error {
+ const op = errors.Op("redis_driver_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+ now := time.Now()
+ for _, item := range items {
+ if item == EmptyItem {
+ return errors.E(op, errors.EmptyKey)
+ }
+
+ if item.TTL == "" {
+ err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err()
+ if err != nil {
+ return err
+ }
+ } else {
+ t, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+ err = d.universalClient.Set(context.Background(), item.Key, item.Value, t.Sub(now)).Err()
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// Delete one or multiple keys.
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("redis_driver_delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+ return d.universalClient.Del(context.Background(), keys...).Err()
+}
+
+// MExpire https://redis.io/commands/expire
+// timeout in RFC3339
+func (d *Driver) MExpire(items ...kv.Item) error {
+ const op = errors.Op("redis_driver_mexpire")
+ now := time.Now()
+ for _, item := range items {
+ if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ t, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+
+ // t guessed to be in future
+ // for Redis we use t.Sub, it will result in seconds, like 4.2s
+ d.universalClient.Expire(context.Background(), item.Key, t.Sub(now))
+ }
+
+ return nil
+}
+
+// TTL https://redis.io/commands/ttl
+// return time in seconds (float64) for a given keys
+func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("redis_driver_ttl")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, key := range keys {
+ duration, err := d.universalClient.TTL(context.Background(), key).Result()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = duration.Seconds()
+ }
+ return m, nil
+}
diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go
new file mode 100644
index 00000000..d2183411
--- /dev/null
+++ b/plugins/kv/drivers/redis/plugin.go
@@ -0,0 +1,51 @@
+package redis
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "redis"
+
+// Plugin BoltDB K/V storage.
+type Plugin struct {
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ if !cfg.Has(kv.PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ s.log = log
+ s.cfgPlugin = cfg
+ return nil
+}
+
+// Serve is noop here
+func (s *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (s *Plugin) Stop() error {
+ return nil
+}
+
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("redis_plugin_provide")
+ st, err := NewRedisDriver(s.log, key, s.cfgPlugin)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return st, nil
+}
+
+// Name returns plugin name
+func (s *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index c1367cdf..20dbb8b3 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -35,7 +35,15 @@ type Storage interface {
// Delete one or multiple keys.
Delete(keys ...string) error
+}
+
+// StorageDriver interface provide storage
+type StorageDriver interface {
+ Provider
+}
- // Close closes the storage and underlying resources.
- Close() error
+// Provider provides storage based on the config
+type Provider interface {
+ // Provide provides Storage based on the config key
+ Provide(key string) (Storage, error)
}
diff --git a/plugins/kv/memcached/plugin_unit_test.go b/plugins/kv/memcached/plugin_unit_test.go
deleted file mode 100644
index 31423627..00000000
--- a/plugins/kv/memcached/plugin_unit_test.go
+++ /dev/null
@@ -1,432 +0,0 @@
-package memcached
-
-import (
- "strconv"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/stretchr/testify/assert"
-)
-
-func initStorage() kv.Storage {
- return NewMemcachedClient("localhost:11211")
-}
-
-func cleanup(t *testing.T, s kv.Storage, keys ...string) {
- err := s.Delete(keys...)
- if err != nil {
- t.Fatalf("error during cleanup: %s", err.Error())
- }
-}
-
-func TestStorage_Has(t *testing.T) {
- s := initStorage()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-}
-
-func TestStorage_Has_Set_Has(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-}
-
-func TestStorage_Has_Set_MGet(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestStorage_Has_Set_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.Get("key")
- assert.NoError(t, err)
-
- if string(res) != "hello world" {
- t.Fatal("wrong value by key")
- }
-}
-
-func TestStorage_Set_Del_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- // check that keys are present
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-
- assert.NoError(t, s.Delete("key", "key2"))
- // check that keys are not present
- res, err = s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 0)
-}
-
-func TestStorage_Set_GetM(t *testing.T) {
- s := initStorage()
-
- defer func() {
- cleanup(t, s, "key", "key2")
-
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestStorage_MExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
- // set timeout to 5 sec
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- i1 := kv.Item{
- Key: "key",
- Value: "",
- TTL: nowPlusFive,
- }
- i2 := kv.Item{
- Key: "key2",
- Value: "",
- TTL: nowPlusFive,
- }
- assert.NoError(t, s.MExpire(i1, i2))
-
- time.Sleep(time.Second * 7)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
-
-func TestNilAndWrongArgs(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- // check
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- _, err = s.Has("")
- assert.Error(t, err)
-
- _, err = s.Get("")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", "")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", " ")
- assert.Error(t, err)
-
- assert.Error(t, s.Set(kv.Item{}))
-
- err = s.Delete("")
- assert.Error(t, err)
-
- err = s.Delete("key", "")
- assert.Error(t, err)
-
- err = s.Delete("key", " ")
- assert.Error(t, err)
-
- err = s.Delete("key")
- assert.NoError(t, err)
-}
-
-func TestStorage_SetExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- // set timeout to 5 sec
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: nowPlusFive,
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: nowPlusFive,
- }))
-
- time.Sleep(time.Second * 7)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
-
-func TestConcurrentReadWriteTransactions(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- wg := &sync.WaitGroup{}
- wg.Add(3)
-
- m := &sync.RWMutex{}
- // concurrently set the keys
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- // set is writable transaction
- // it should stop readable
- assert.NoError(t, s.Set(kv.Item{
- Key: "key" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }, kv.Item{
- Key: "key2" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }))
- m.Unlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.RLock()
- v, err = s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- m.RUnlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- err = s.Delete("key" + strconv.Itoa(i))
- assert.NoError(t, err)
- m.Unlock()
- }
- }(s)
-
- wg.Wait()
-}
diff --git a/plugins/kv/memory/plugin_unit_test.go b/plugins/kv/memory/plugin_unit_test.go
deleted file mode 100644
index 1965a696..00000000
--- a/plugins/kv/memory/plugin_unit_test.go
+++ /dev/null
@@ -1,472 +0,0 @@
-package memory
-
-import (
- "strconv"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/stretchr/testify/assert"
- "go.uber.org/zap"
-)
-
-func initStorage() kv.Storage {
- p := &Plugin{
- stop: make(chan struct{}),
- }
- p.cfg = &Config{
- Interval: 1,
- }
-
- l, _ := zap.NewDevelopment()
- p.log = logger.NewZapAdapter(l)
-
- go p.gc()
-
- return p
-}
-
-func cleanup(t *testing.T, s kv.Storage, keys ...string) {
- err := s.Delete(keys...)
- if err != nil {
- t.Fatalf("error during cleanup: %s", err.Error())
- }
-}
-
-func TestStorage_Has(t *testing.T) {
- s := initStorage()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-}
-
-func TestStorage_Has_Set_Has(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-}
-
-func TestStorage_Has_Set_MGet(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestStorage_Has_Set_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.Get("key")
- assert.NoError(t, err)
-
- if string(res) != "value" {
- t.Fatal("wrong value by key")
- }
-}
-
-func TestStorage_Set_Del_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- // check that keys are present
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-
- assert.NoError(t, s.Delete("key", "key2"))
- // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm
- res, err = s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 0)
-}
-
-func TestStorage_Set_GetM(t *testing.T) {
- s := initStorage()
-
- defer func() {
- cleanup(t, s, "key", "key2")
-
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: "",
- }))
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestStorage_MExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
-
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
- // set timeout to 5 sec
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- i1 := kv.Item{
- Key: "key",
- Value: "",
- TTL: nowPlusFive,
- }
- i2 := kv.Item{
- Key: "key2",
- Value: "",
- TTL: nowPlusFive,
- }
- assert.NoError(t, s.MExpire(i1, i2))
-
- time.Sleep(time.Second * 7)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
-
-func TestNilAndWrongArgs(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- // check
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- _, err = s.Has("")
- assert.Error(t, err)
-
- _, err = s.Get("")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", "")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", " ")
- assert.Error(t, err)
-
- assert.NoError(t, s.Set(kv.Item{}))
- _, err = s.Has("key")
- assert.NoError(t, err)
-
- err = s.Delete("")
- assert.Error(t, err)
-
- err = s.Delete("key", "")
- assert.Error(t, err)
-
- err = s.Delete("key", " ")
- assert.Error(t, err)
-
- err = s.Delete("key")
- assert.NoError(t, err)
-}
-
-func TestStorage_SetExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- // set timeout to 5 sec
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: nowPlusFive,
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: nowPlusFive,
- }))
-
- time.Sleep(time.Second * 2)
- m, err := s.TTL("key", "key2")
- assert.NoError(t, err)
-
- // remove a precision 4.02342342 -> 4
- keyTTL, err := strconv.Atoi(m["key"].(string)[0:1])
- if err != nil {
- t.Fatal(err)
- }
-
- // remove a precision 4.02342342 -> 4
- key2TTL, err := strconv.Atoi(m["key"].(string)[0:1])
- if err != nil {
- t.Fatal(err)
- }
-
- assert.True(t, keyTTL < 5)
- assert.True(t, key2TTL < 5)
-
- time.Sleep(time.Second * 4)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
-
-func TestConcurrentReadWriteTransactions(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- wg := &sync.WaitGroup{}
- wg.Add(3)
-
- m := &sync.RWMutex{}
- // concurrently set the keys
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- // set is writable transaction
- // it should stop readable
- assert.NoError(t, s.Set(kv.Item{
- Key: "key" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }, kv.Item{
- Key: "key2" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }))
- m.Unlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.RLock()
- v, err = s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- m.RUnlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- err = s.Delete("key" + strconv.Itoa(i))
- assert.NoError(t, err)
- m.Unlock()
- }
- }(s)
-
- wg.Wait()
-}
diff --git a/plugins/kv/payload/generated/Item.go b/plugins/kv/payload/generated/Item.go
new file mode 100644
index 00000000..61bd6024
--- /dev/null
+++ b/plugins/kv/payload/generated/Item.go
@@ -0,0 +1,67 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package generated
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Item struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsItem(buf []byte, offset flatbuffers.UOffsetT) *Item {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Item{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func (rcv *Item) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Item) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Item) Key() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Item) Value() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Item) Timeout() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func ItemStart(builder *flatbuffers.Builder) {
+ builder.StartObject(3)
+}
+func ItemAddKey(builder *flatbuffers.Builder, Key flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Key), 0)
+}
+func ItemAddValue(builder *flatbuffers.Builder, Value flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Value), 0)
+}
+func ItemAddTimeout(builder *flatbuffers.Builder, Timeout flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(Timeout), 0)
+}
+func ItemEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
diff --git a/plugins/kv/payload/generated/Payload.go b/plugins/kv/payload/generated/Payload.go
new file mode 100644
index 00000000..a2c6cfdb
--- /dev/null
+++ b/plugins/kv/payload/generated/Payload.go
@@ -0,0 +1,71 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package generated
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Payload struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsPayload(buf []byte, offset flatbuffers.UOffsetT) *Payload {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Payload{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func (rcv *Payload) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Payload) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Payload) Storage() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Payload) Items(obj *Item, j int) bool {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ x := rcv._tab.Vector(o)
+ x += flatbuffers.UOffsetT(j) * 4
+ x = rcv._tab.Indirect(x)
+ obj.Init(rcv._tab.Bytes, x)
+ return true
+ }
+ return false
+}
+
+func (rcv *Payload) ItemsLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func PayloadStart(builder *flatbuffers.Builder) {
+ builder.StartObject(2)
+}
+func PayloadAddStorage(builder *flatbuffers.Builder, Storage flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Storage), 0)
+}
+func PayloadAddItems(builder *flatbuffers.Builder, Items flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Items), 0)
+}
+func PayloadStartItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func PayloadEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
diff --git a/plugins/kv/payload/payload.fbs b/plugins/kv/payload/payload.fbs
new file mode 100644
index 00000000..7e02c1a0
--- /dev/null
+++ b/plugins/kv/payload/payload.fbs
@@ -0,0 +1,14 @@
+namespace generated;
+
+table Payload {
+ Storage:string;
+ Items:[Item];
+}
+
+table Item {
+ Key:string;
+ Value:string;
+ Timeout:string;
+}
+
+root_type Payload;
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 751f0d12..4947dbe3 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -1,110 +1,221 @@
package kv
import (
+ "unsafe"
+
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
// Wrapper for the plugin
-type RPCServer struct {
+type rpc struct {
+ // all available storages
+ storages map[string]Storage
// svc is a plugin implementing Storage interface
- svc Storage
+ srv *Plugin
// Logger
log logger.Logger
}
-// NewRPCServer construct RPC server for the particular plugin
-func NewRPCServer(srv Storage, log logger.Logger) *RPCServer {
- return &RPCServer{
- svc: srv,
- log: log,
+// Has accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Has(in []byte, res *map[string]bool) error {
+ const op = errors.Op("rpc_has")
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+
+ keys := make([]string, 0, l)
+
+ tmpItem := &generated.Item{}
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, strConvert(tmpItem.Key()))
}
-}
-// data Data
-func (r *RPCServer) Has(in []string, res *map[string]bool) error {
- const op = errors.Op("rpc server Has")
- ret, err := r.svc.Has(in...)
- if err != nil {
- return errors.E(op, err)
+ if st, ok := r.storages[strConvert(dataRoot.Storage())]; ok {
+ ret, err := st.Has(keys...)
+ if err != nil {
+ return err
+ }
+
+ // update the value in the pointer
+ // save the result
+ *res = ret
+ return nil
}
- // update the value in the pointer
- *res = ret
- return nil
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in SetData
-func (r *RPCServer) Set(in []Item, ok *bool) error {
- const op = errors.Op("rpc server Set")
+// Set accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Set(in []byte, ok *bool) error {
+ const op = errors.Op("rpc_set")
+
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
- err := r.svc.Set(in...)
- if err != nil {
- return errors.E(op, err)
+ items := make([]Item, 0, dataRoot.ItemsLength())
+ tmpItem := &generated.Item{}
+
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+
+ itc := Item{
+ Key: string(tmpItem.Key()),
+ Value: string(tmpItem.Value()),
+ TTL: string(tmpItem.Timeout()),
+ }
+
+ items = append(items, itc)
}
- *ok = true
- return nil
-}
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ err := st.Set(items...)
+ if err != nil {
+ return err
+ }
-// in Data
-func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error {
- const op = errors.Op("rpc server MGet")
- ret, err := r.svc.MGet(in...)
- if err != nil {
- return errors.E(op, err)
+ // save the result
+ *ok = true
+ return nil
}
- // update return value
- *res = ret
- return nil
+ *ok = false
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in Data
-func (r *RPCServer) MExpire(in []Item, ok *bool) error {
- const op = errors.Op("rpc server MExpire")
+// MGet accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
+ const op = errors.Op("rpc_mget")
+
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+ keys := make([]string, 0, l)
+ tmpItem := &generated.Item{}
- err := r.svc.MExpire(in...)
- if err != nil {
- return errors.E(op, err)
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, string(tmpItem.Key()))
}
- *ok = true
- return nil
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ ret, err := st.MGet(keys...)
+ if err != nil {
+ return err
+ }
+
+ // save the result
+ *res = ret
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in Data
-func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error {
- const op = errors.Op("rpc server TTL")
+// MExpire accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) MExpire(in []byte, ok *bool) error {
+ const op = errors.Op("rpc_mexpire")
+
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+
+ // when unmarshalling the keys, simultaneously, fill up the slice with items
+ items := make([]Item, 0, l)
+ tmpItem := &generated.Item{}
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+
+ itc := Item{
+ Key: string(tmpItem.Key()),
+ // we set up timeout on the keys, so, value here is redundant
+ Value: "",
+ TTL: string(tmpItem.Timeout()),
+ }
+
+ items = append(items, itc)
+ }
+
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ err := st.MExpire(items...)
+ if err != nil {
+ return errors.E(op, err)
+ }
- ret, err := r.svc.TTL(in...)
- if err != nil {
- return errors.E(op, err)
+ // save the result
+ *ok = true
+ return nil
}
- *res = ret
- return nil
+ *ok = false
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in Data
-func (r *RPCServer) Delete(in []string, ok *bool) error {
- const op = errors.Op("rpc server Delete")
- err := r.svc.Delete(in...)
- if err != nil {
- return errors.E(op, err)
+// TTL accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
+ const op = errors.Op("rpc_ttl")
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+ keys := make([]string, 0, l)
+ tmpItem := &generated.Item{}
+
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, string(tmpItem.Key()))
}
- *ok = true
- return nil
+
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ ret, err := st.TTL(keys...)
+ if err != nil {
+ return err
+ }
+
+ // save the result
+ *res = ret
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in string, storages
-func (r *RPCServer) Close(storage string, ok *bool) error {
- const op = errors.Op("rpc server Close")
- err := r.svc.Close()
- if err != nil {
- return errors.E(op, err)
+// Delete accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Delete(in []byte, ok *bool) error {
+ const op = errors.Op("rcp_delete")
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+ keys := make([]string, 0, l)
+ tmpItem := &generated.Item{}
+
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, string(tmpItem.Key()))
+ }
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ err := st.Delete(keys...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // save the result
+ *ok = true
+ return nil
}
- *ok = true
- return nil
+ *ok = false
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+}
+
+func strConvert(s []byte) string {
+ return *(*string)(unsafe.Pointer(&s))
}
diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go
new file mode 100644
index 00000000..a9530f56
--- /dev/null
+++ b/plugins/kv/storage.go
@@ -0,0 +1,182 @@
+package kv
+
+import (
+ "fmt"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName string = "kv"
+
+const (
+ // driver is the mandatory field which should present in every storage
+ driver string = "driver"
+
+ memcached string = "memcached"
+ boltdb string = "boltdb"
+ redis string = "redis"
+ memory string = "memory"
+)
+
+// Plugin for the unified storage
+type Plugin struct {
+ log logger.Logger
+ // drivers contains general storage drivers, such as boltdb, memory, memcached, redis.
+ drivers map[string]StorageDriver
+ // storages contains user-defined storages, such as boltdb-north, memcached-us and so on.
+ storages map[string]Storage
+ // KV configuration
+ cfg Config
+ cfgPlugin config.Configurer
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("kv_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg.Data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ p.drivers = make(map[string]StorageDriver, 5)
+ p.storages = make(map[string]Storage, 5)
+ p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ const op = errors.Op("kv_plugin_serve")
+ // key - storage name in the config
+ // value - storage
+ /*
+ For example we can have here 2 storages (but they are not pre-configured)
+ for the boltdb and memcached
+ We should provide here the actual configs for the all requested storages
+ kv:
+ boltdb-south:
+ driver: boltdb
+ dir: "tests/rr-bolt"
+ file: "rr.db"
+ bucket: "rr"
+ permissions: 777
+ ttl: 40s
+
+ boltdb-north:
+ driver: boltdb
+ dir: "tests/rr-bolt"
+ file: "rr.db"
+ bucket: "rr"
+ permissions: 777
+ ttl: 40s
+
+ memcached:
+ driver: memcached
+ addr: [ "localhost:11211" ]
+
+
+ For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
+ when user requests for example boltdb-south, we should provide that particular preconfigured storage
+ */
+ for k, v := range p.cfg.Data {
+ if _, ok := v.(map[string]interface{})[driver]; !ok {
+ errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
+ return errCh
+ }
+
+ // config key for the particular sub-driver
+ configKey := fmt.Sprintf("%s.%s", PluginName, k)
+ // at this point we know, that driver field present in the cofiguration
+ switch v.(map[string]interface{})[driver] {
+ case memcached:
+ if _, ok := p.drivers[memcached]; !ok {
+ continue
+ }
+
+ storage, err := p.drivers[memcached].Provide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+
+ case boltdb:
+ if _, ok := p.drivers[boltdb]; !ok {
+ continue
+ }
+
+ storage, err := p.drivers[boltdb].Provide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ case memory:
+ if _, ok := p.drivers[memory]; !ok {
+ continue
+ }
+
+ storage, err := p.drivers[memory].Provide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ case redis:
+ if _, ok := p.drivers[redis]; !ok {
+ continue
+ }
+
+ storage, err := p.drivers[redis].Provide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+
+ default:
+ p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])))
+ }
+ }
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+// Collects will get all plugins which implement Storage interface
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.GetAllStorageDrivers,
+ }
+}
+
+func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) {
+ // save the storage driver
+ p.drivers[name.Name()] = storage
+}
+
+// RPC returns associated rpc service.
+func (p *Plugin) RPC() interface{} {
+ return &rpc{srv: p, log: p.log, storages: p.storages}
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go
index 909c8ca4..c0be6137 100644
--- a/plugins/redis/interface.go
+++ b/plugins/redis/interface.go
@@ -4,6 +4,6 @@ import "github.com/go-redis/redis/v8"
// Redis in the redis KV plugin interface
type Redis interface {
- // GetClient
+ // GetClient provides universal redis client
GetClient() redis.UniversalClient
}
diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go
index ee0deda6..43382e56 100644
--- a/plugins/resetter/plugin.go
+++ b/plugins/resetter/plugin.go
@@ -74,7 +74,7 @@ func (p *Plugin) Name() string {
return PluginName
}
-// RPCService returns associated rpc service.
+// RPC returns associated rpc service.
func (p *Plugin) RPC() interface{} {
return &rpc{srv: p, log: p.log}
}
diff --git a/plugins/rpc/interface.go b/plugins/rpc/interface.go
index 683fd2ec..eb6da9af 100644
--- a/plugins/rpc/interface.go
+++ b/plugins/rpc/interface.go
@@ -2,6 +2,6 @@ package rpc
// RPCer declares the ability to create set of public RPC methods.
type RPCer interface {
- // Provides RPC methods for the given service.
+ // RPC Provides methods for the given service.
RPC() interface{}
}
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
index 94fec0b6..b80994d3 100644
--- a/plugins/rpc/plugin.go
+++ b/plugins/rpc/plugin.go
@@ -13,7 +13,7 @@ import (
)
// PluginName contains default plugin name.
-const PluginName = "RPC"
+const PluginName = "rpc"
// Plugin is RPC service.
type Plugin struct {
@@ -23,7 +23,7 @@ type Plugin struct {
// set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC
plugins map[string]RPCer
listener net.Listener
- closed *uint32
+ closed uint32
}
// Init rpc service. Must return true if service is enabled.
@@ -40,13 +40,12 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
// Init defaults
s.cfg.InitDefaults()
// Init pluggable plugins map
- s.plugins = make(map[string]RPCer)
+ s.plugins = make(map[string]RPCer, 5)
// init logs
s.log = log
+
// set up state
- state := uint32(0)
- s.closed = &state
- atomic.StoreUint32(s.closed, 0)
+ atomic.StoreUint32(&s.closed, 0)
// validate config
err = s.cfg.Valid()
@@ -79,7 +78,7 @@ func (s *Plugin) Serve() chan error {
var err error
s.listener, err = s.cfg.Listener()
if err != nil {
- errCh <- err
+ errCh <- errors.E(op, err)
return errCh
}
@@ -89,7 +88,7 @@ func (s *Plugin) Serve() chan error {
for {
conn, err := s.listener.Accept()
if err != nil {
- if atomic.LoadUint32(s.closed) == 1 {
+ if atomic.LoadUint32(&s.closed) == 1 {
// just continue, this is not a critical issue, we just called Stop
return
}
@@ -110,7 +109,7 @@ func (s *Plugin) Serve() chan error {
func (s *Plugin) Stop() error {
const op = errors.Op("rpc_plugin_stop")
// store closed state
- atomic.StoreUint32(s.closed, 1)
+ atomic.StoreUint32(&s.closed, 1)
err := s.listener.Close()
if err != nil {
return errors.E(op, err)
@@ -123,7 +122,7 @@ func (s *Plugin) Name() string {
return PluginName
}
-// Depends declares services to collect for RPC.
+// Collects all plugins which implement Name + RPCer interfaces
func (s *Plugin) Collects() []interface{} {
return []interface{}{
s.RegisterPlugin,
@@ -150,13 +149,3 @@ func (s *Plugin) Register(name string, svc interface{}) error {
return s.rpc.RegisterName(name, svc)
}
-
-// Client creates new RPC client.
-func (s *Plugin) Client() (*rpc.Client, error) {
- conn, err := s.cfg.Dialer()
- if err != nil {
- return nil, err
- }
-
- return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil
-}