diff options
Diffstat (limited to 'tests/plugins/kv')
-rw-r--r-- | tests/plugins/kv/configs/.rr-boltdb.yaml | 16 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-in-memory.yaml | 12 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml | 19 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml | 18 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-kv-init.yaml | 35 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-memcached.yaml | 13 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-redis-global.yaml | 14 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-redis-no-config.yaml | 10 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-redis.yaml | 13 | ||||
-rw-r--r-- | tests/plugins/kv/storage_plugin_test.go | 1517 |
10 files changed, 0 insertions, 1667 deletions
diff --git a/tests/plugins/kv/configs/.rr-boltdb.yaml b/tests/plugins/kv/configs/.rr-boltdb.yaml deleted file mode 100644 index 7a8aee4e..00000000 --- a/tests/plugins/kv/configs/.rr-boltdb.yaml +++ /dev/null @@ -1,16 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -logs: - mode: development - level: error - -kv: - boltdb-rr: - driver: boltdb - config: - dir: "." - file: "rr.db" - bucket: "test" - permissions: 0666 - interval: 1 # seconds diff --git a/tests/plugins/kv/configs/.rr-in-memory.yaml b/tests/plugins/kv/configs/.rr-in-memory.yaml deleted file mode 100644 index 0452d8bc..00000000 --- a/tests/plugins/kv/configs/.rr-in-memory.yaml +++ /dev/null @@ -1,12 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -logs: - mode: development - level: error - -kv: - memory-rr: - driver: memory - config: - interval: 1 diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml deleted file mode 100644 index 476369c5..00000000 --- a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml +++ /dev/null @@ -1,19 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -logs: - mode: development - level: error - -kv: - boltdb-south: - driver: boltdb - config: - file: "rr.db" - permissions: 755 - - boltdb-africa: - driver: boltdb - config: - file: "africa.db" - permissions: 755 diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml deleted file mode 100644 index e7728972..00000000 --- a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml +++ /dev/null @@ -1,18 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -logs: - mode: development - level: error - -kv: - boltdb-south: - driver: boltdb - config: - file: "rr.db" - - boltdb-africa: - driver: boltdb - config: - file: "africa.db" - permissions: 0777 diff --git a/tests/plugins/kv/configs/.rr-kv-init.yaml b/tests/plugins/kv/configs/.rr-kv-init.yaml deleted file mode 100644 index 10cf6491..00000000 --- a/tests/plugins/kv/configs/.rr-kv-init.yaml +++ /dev/null @@ -1,35 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -logs: - mode: development - level: error - -kv: - default: - driver: memory - config: - interval: 60 - - boltdb-south: - driver: boltdb - config: - dir: "." - file: "rr.db" - bucket: "rr" - permissions: 0666 - interval: 1 - - boltdb-africa: - driver: boltdb - config: - dir: "." - file: "africa.db" - bucket: "rr" - permissions: 0666 - interval: 1 - - memcached: - driver: memcached - config: - addr: ["127.0.0.1:11211"] diff --git a/tests/plugins/kv/configs/.rr-memcached.yaml b/tests/plugins/kv/configs/.rr-memcached.yaml deleted file mode 100644 index ef8de2ab..00000000 --- a/tests/plugins/kv/configs/.rr-memcached.yaml +++ /dev/null @@ -1,13 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -logs: - mode: development - level: error - -kv: - memcached-rr: - driver: memcached - config: - addr: - - "127.0.0.1:11211" diff --git a/tests/plugins/kv/configs/.rr-redis-global.yaml b/tests/plugins/kv/configs/.rr-redis-global.yaml deleted file mode 100644 index 27377835..00000000 --- a/tests/plugins/kv/configs/.rr-redis-global.yaml +++ /dev/null @@ -1,14 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -logs: - mode: development - level: error - -redis-rr: - addrs: - - "127.0.0.1:6379" - -kv: - redis-rr: - driver: redis diff --git a/tests/plugins/kv/configs/.rr-redis-no-config.yaml b/tests/plugins/kv/configs/.rr-redis-no-config.yaml deleted file mode 100644 index 56113f13..00000000 --- a/tests/plugins/kv/configs/.rr-redis-no-config.yaml +++ /dev/null @@ -1,10 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -logs: - mode: development - level: error - -kv: - redis-rr: - driver: redis diff --git a/tests/plugins/kv/configs/.rr-redis.yaml b/tests/plugins/kv/configs/.rr-redis.yaml deleted file mode 100644 index f9b967d5..00000000 --- a/tests/plugins/kv/configs/.rr-redis.yaml +++ /dev/null @@ -1,13 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -logs: - mode: development - level: error - -kv: - redis-rr: - driver: redis - config: - addrs: - - "127.0.0.1:6379" diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go deleted file mode 100644 index c10e4726..00000000 --- a/tests/plugins/kv/storage_plugin_test.go +++ /dev/null @@ -1,1517 +0,0 @@ -package kv - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/boltdb" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/memcached" - "github.com/spiral/roadrunner/v2/plugins/memory" - "github.com/spiral/roadrunner/v2/plugins/redis" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - payload "github.com/spiral/roadrunner/v2/proto/kv/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" -) - -func TestKVInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-kv-init.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &memory.Plugin{}, - &boltdb.Plugin{}, - &memcached.Plugin{}, - &redis.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &kv.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("KvSetTest", kvSetTest) - t.Run("KvHasTest", kvHasTest) - - stopCh <- struct{}{} - - wg.Wait() - - _ = os.RemoveAll("rr.db") - _ = os.RemoveAll("africa.db") -} - -func TestKVNoInterval(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-kv-bolt-no-interval.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &boltdb.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &kv.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("KvSetTest", kvSetTest) - t.Run("KvHasTest", kvHasTest) - - stopCh <- struct{}{} - - wg.Wait() - - _ = os.RemoveAll("rr.db") - _ = os.RemoveAll("africa.db") -} - -func TestKVCreateToReopenWithPerms(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-kv-bolt-perms.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &boltdb.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &kv.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - stopCh <- struct{}{} - wg.Wait() -} - -func TestKVCreateToReopenWithPerms2(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-kv-bolt-perms.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &boltdb.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &kv.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("KvSetTest", kvSetTest) - t.Run("KvHasTest", kvHasTest) - - stopCh <- struct{}{} - - wg.Wait() - - _ = os.RemoveAll("rr.db") - _ = os.RemoveAll("africa.db") -} - -func kvSetTest(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - // WorkerList contains list of workers. - p := &payload.Request{ - Storage: "boltdb-south", - Items: []*payload.Item{ - { - Key: "key", - Value: []byte("val"), - }, - }, - } - - resp := &payload.Response{} - err = client.Call("kv.Set", p, resp) - assert.NoError(t, err) -} - -func kvHasTest(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - // WorkerList contains list of workers. - p := &payload.Request{ - Storage: "boltdb-south", - Items: []*payload.Item{ - { - Key: "key", - Value: []byte("val"), - }, - }, - } - - ret := &payload.Response{} - err = client.Call("kv.Has", p, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 1) -} - -func TestBoltDb(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-boltdb.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &kv.Plugin{}, - &boltdb.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &memory.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("BOLTDB", testRPCMethods) - stopCh <- struct{}{} - wg.Wait() - - _ = os.Remove("rr.db") -} - -func testRPCMethods(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - // add 5 second ttl - tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := &payload.Request{ - Storage: "boltdb-rr", - Items: []*payload.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "c", - }, - }, - } - - data := &payload.Request{ - Storage: "boltdb-rr", - Items: []*payload.Item{ - { - Key: "a", - Value: []byte("aa"), - }, - { - Key: "b", - Value: []byte("bb"), - }, - { - Key: "c", - Value: []byte("cc"), - Timeout: tt, - }, - { - Key: "d", - Value: []byte("dd"), - }, - { - Key: "e", - Value: []byte("ee"), - }, - }, - } - - ret := &payload.Response{} - // Register 3 keys with values - err = client.Call("kv.Set", data, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 3) // should be 3 - - // key "c" should be deleted - time.Sleep(time.Second * 7) - - ret = &payload.Response{} - err = client.Call("kv.Has", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 2) // should be 2 - - ret = &payload.Response{} - err = client.Call("kv.MGet", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 2) // c is expired - - tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - - data2 := &payload.Request{ - Storage: "boltdb-rr", - Items: []*payload.Item{ - { - Key: "a", - Timeout: tt2, - }, - { - Key: "b", - Timeout: tt2, - }, - { - Key: "d", - Timeout: tt2, - }, - }, - } - - // MEXPIRE - ret = &payload.Response{} - err = client.Call("kv.MExpire", data2, ret) - assert.NoError(t, err) - - // TTL - keys2 := &payload.Request{ - Storage: "boltdb-rr", - Items: []*payload.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "d", - }, - }, - } - - ret = &payload.Response{} - err = client.Call("kv.TTL", keys2, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 3) - - // HAS AFTER TTL - time.Sleep(time.Second * 15) - ret = &payload.Response{} - err = client.Call("kv.Has", keys2, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) - - // DELETE - keysDel := &payload.Request{ - Storage: "boltdb-rr", - Items: []*payload.Item{ - { - Key: "e", - }, - }, - } - - ret = &payload.Response{} - err = client.Call("kv.Delete", keysDel, ret) - assert.NoError(t, err) - - // HAS AFTER DELETE - ret = &payload.Response{} - err = client.Call("kv.Has", keysDel, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) - - dataClear := &payload.Request{ - Storage: "boltdb-rr", - Items: []*payload.Item{ - { - Key: "a", - Value: []byte("aa"), - }, - { - Key: "b", - Value: []byte("bb"), - }, - { - Key: "c", - Value: []byte("cc"), - }, - { - Key: "d", - Value: []byte("dd"), - }, - { - Key: "e", - Value: []byte("ee"), - }, - }, - } - - clear := &payload.Request{Storage: "boltdb-rr"} - - ret = &payload.Response{} - // Register 3 keys with values - err = client.Call("kv.Set", dataClear, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", dataClear, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 5) // should be 5 - - ret = &payload.Response{} - err = client.Call("kv.Clear", clear, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", dataClear, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) // should be 5 -} - -func TestMemcached(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-memcached.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &kv.Plugin{}, - &memcached.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &memory.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("MEMCACHED", testRPCMethodsMemcached) - stopCh <- struct{}{} - wg.Wait() -} - -func testRPCMethodsMemcached(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - // add 5 second ttl - tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - keys := &payload.Request{ - Storage: "memcached-rr", - Items: []*payload.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "c", - }, - }, - } - - data := &payload.Request{ - Storage: "memcached-rr", - Items: []*payload.Item{ - { - Key: "a", - Value: []byte("aa"), - }, - { - Key: "b", - Value: []byte("bb"), - }, - { - Key: "c", - Value: []byte("cc"), - Timeout: tt, - }, - { - Key: "d", - Value: []byte("dd"), - }, - { - Key: "e", - Value: []byte("ee"), - }, - }, - } - - ret := &payload.Response{} - // Register 3 keys with values - err = client.Call("kv.Set", data, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 3) // should be 3 - - // key "c" should be deleted - time.Sleep(time.Second * 7) - - ret = &payload.Response{} - err = client.Call("kv.Has", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 2) // should be 2 - - ret = &payload.Response{} - err = client.Call("kv.MGet", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 2) // c is expired - - tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - - data2 := &payload.Request{ - Storage: "memcached-rr", - Items: []*payload.Item{ - { - Key: "a", - Timeout: tt2, - }, - { - Key: "b", - Timeout: tt2, - }, - { - Key: "d", - Timeout: tt2, - }, - }, - } - - // MEXPIRE - ret = &payload.Response{} - err = client.Call("kv.MExpire", data2, ret) - assert.NoError(t, err) - - // TTL call is not supported for the memcached driver - keys2 := &payload.Request{ - Storage: "memcached-rr", - Items: []*payload.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "d", - }, - }, - } - - ret = &payload.Response{} - err = client.Call("kv.TTL", keys2, ret) - assert.Error(t, err) - assert.Len(t, ret.GetItems(), 0) - - // HAS AFTER TTL - time.Sleep(time.Second * 15) - ret = &payload.Response{} - err = client.Call("kv.Has", keys2, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) - - // DELETE - keysDel := &payload.Request{ - Storage: "memcached-rr", - Items: []*payload.Item{ - { - Key: "e", - }, - }, - } - - ret = &payload.Response{} - err = client.Call("kv.Delete", keysDel, ret) - assert.NoError(t, err) - - // HAS AFTER DELETE - ret = &payload.Response{} - err = client.Call("kv.Has", keysDel, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) - - dataClear := &payload.Request{ - Storage: "memcached-rr", - Items: []*payload.Item{ - { - Key: "a", - Value: []byte("aa"), - }, - { - Key: "b", - Value: []byte("bb"), - }, - { - Key: "c", - Value: []byte("cc"), - }, - { - Key: "d", - Value: []byte("dd"), - }, - { - Key: "e", - Value: []byte("ee"), - }, - }, - } - - clear := &payload.Request{Storage: "memcached-rr"} - - ret = &payload.Response{} - // Register 3 keys with values - err = client.Call("kv.Set", dataClear, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", dataClear, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 5) // should be 5 - - ret = &payload.Response{} - err = client.Call("kv.Clear", clear, ret) - assert.NoError(t, err) - - time.Sleep(time.Second * 2) - ret = &payload.Response{} - err = client.Call("kv.Has", dataClear, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) // should be 5 -} - -func TestInMemory(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-in-memory.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &kv.Plugin{}, - &memory.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("INMEMORY", testRPCMethodsInMemory) - stopCh <- struct{}{} - wg.Wait() -} - -func testRPCMethodsInMemory(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - // add 5 second ttl - - tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := &payload.Request{ - Storage: "memory-rr", - Items: []*payload.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "c", - }, - }, - } - - data := &payload.Request{ - Storage: "memory-rr", - Items: []*payload.Item{ - { - Key: "a", - Value: []byte("aa"), - }, - { - Key: "b", - Value: []byte("bb"), - }, - { - Key: "c", - Value: []byte("cc"), - Timeout: tt, - }, - { - Key: "d", - Value: []byte("dd"), - }, - { - Key: "e", - Value: []byte("ee"), - }, - }, - } - - ret := &payload.Response{} - // Register 3 keys with values - err = client.Call("kv.Set", data, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 3) // should be 3 - - // key "c" should be deleted - time.Sleep(time.Second * 7) - - ret = &payload.Response{} - err = client.Call("kv.Has", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 2) // should be 2 - - ret = &payload.Response{} - err = client.Call("kv.MGet", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 2) // c is expired - - tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - - data2 := &payload.Request{ - Storage: "memory-rr", - Items: []*payload.Item{ - { - Key: "a", - Timeout: tt2, - }, - { - Key: "b", - Timeout: tt2, - }, - { - Key: "d", - Timeout: tt2, - }, - }, - } - - // MEXPIRE - ret = &payload.Response{} - err = client.Call("kv.MExpire", data2, ret) - assert.NoError(t, err) - - // TTL - keys2 := &payload.Request{ - Storage: "memory-rr", - Items: []*payload.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "d", - }, - }, - } - - ret = &payload.Response{} - err = client.Call("kv.TTL", keys2, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 3) - - // HAS AFTER TTL - time.Sleep(time.Second * 15) - ret = &payload.Response{} - err = client.Call("kv.Has", keys2, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) - - // DELETE - keysDel := &payload.Request{ - Storage: "memory-rr", - Items: []*payload.Item{ - { - Key: "e", - }, - }, - } - - ret = &payload.Response{} - err = client.Call("kv.Delete", keysDel, ret) - assert.NoError(t, err) - - // HAS AFTER DELETE - ret = &payload.Response{} - err = client.Call("kv.Has", keysDel, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) - - dataClear := &payload.Request{ - Storage: "memory-rr", - Items: []*payload.Item{ - { - Key: "a", - Value: []byte("aa"), - }, - { - Key: "b", - Value: []byte("bb"), - }, - { - Key: "c", - Value: []byte("cc"), - }, - { - Key: "d", - Value: []byte("dd"), - }, - { - Key: "e", - Value: []byte("ee"), - }, - }, - } - - clear := &payload.Request{Storage: "memory-rr"} - - ret = &payload.Response{} - // Register 3 keys with values - err = client.Call("kv.Set", dataClear, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", dataClear, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 5) // should be 5 - - ret = &payload.Response{} - err = client.Call("kv.Clear", clear, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", dataClear, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) // should be 5 -} - -func TestRedis(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-redis.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &kv.Plugin{}, - &redis.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &memory.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("REDIS", testRPCMethodsRedis) - stopCh <- struct{}{} - wg.Wait() -} - -func TestRedisGlobalSection(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-redis-global.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &kv.Plugin{}, - &redis.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &memory.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - t.Run("REDIS", testRPCMethodsRedis) - stopCh <- struct{}{} - wg.Wait() -} - -func TestRedisNoConfig(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-redis-no-config.yaml", // should be used default - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", []string{"kv"}).AnyTimes() - - mockLogger.EXPECT().Error(`can't find local or global configuration, this section will be skipped`, "local: ", "kv.redis-rr.config", "global: ", "redis-rr").Times(1) - - err = cont.RegisterAll( - cfg, - &kv.Plugin{}, - &redis.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &memory.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - _, err = cont.Serve() - assert.NoError(t, err) -} - -func testRPCMethodsRedis(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - // add 5 second ttl - tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := &payload.Request{ - Storage: "redis-rr", - Items: []*payload.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "c", - }, - }, - } - - data := &payload.Request{ - Storage: "redis-rr", - Items: []*payload.Item{ - { - Key: "a", - Value: []byte("aa"), - }, - { - Key: "b", - Value: []byte("bb"), - }, - { - Key: "c", - Value: []byte("cc"), - Timeout: tt, - }, - { - Key: "d", - Value: []byte("dd"), - }, - { - Key: "e", - Value: []byte("ee"), - }, - }, - } - - ret := &payload.Response{} - // Register 3 keys with values - err = client.Call("kv.Set", data, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 3) // should be 3 - - // key "c" should be deleted - time.Sleep(time.Second * 7) - - ret = &payload.Response{} - err = client.Call("kv.Has", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 2) // should be 2 - - ret = &payload.Response{} - err = client.Call("kv.MGet", keys, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 2) // c is expired - - tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - - data2 := &payload.Request{ - Storage: "redis-rr", - Items: []*payload.Item{ - { - Key: "a", - Timeout: tt2, - }, - { - Key: "b", - Timeout: tt2, - }, - { - Key: "d", - Timeout: tt2, - }, - }, - } - - // MEXPIRE - ret = &payload.Response{} - err = client.Call("kv.MExpire", data2, ret) - assert.NoError(t, err) - - // TTL - keys2 := &payload.Request{ - Storage: "redis-rr", - Items: []*payload.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "d", - }, - }, - } - - ret = &payload.Response{} - err = client.Call("kv.TTL", keys2, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 3) - - // HAS AFTER TTL - time.Sleep(time.Second * 15) - ret = &payload.Response{} - err = client.Call("kv.Has", keys2, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) - - // DELETE - keysDel := &payload.Request{ - Storage: "redis-rr", - Items: []*payload.Item{ - { - Key: "e", - }, - }, - } - - ret = &payload.Response{} - err = client.Call("kv.Delete", keysDel, ret) - assert.NoError(t, err) - - // HAS AFTER DELETE - ret = &payload.Response{} - err = client.Call("kv.Has", keysDel, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) - - dataClear := &payload.Request{ - Storage: "redis-rr", - Items: []*payload.Item{ - { - Key: "a", - Value: []byte("aa"), - }, - { - Key: "b", - Value: []byte("bb"), - }, - { - Key: "c", - Value: []byte("cc"), - }, - { - Key: "d", - Value: []byte("dd"), - }, - { - Key: "e", - Value: []byte("ee"), - }, - }, - } - - clear := &payload.Request{Storage: "redis-rr"} - - ret = &payload.Response{} - // Register 3 keys with values - err = client.Call("kv.Set", dataClear, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", dataClear, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 5) // should be 5 - - ret = &payload.Response{} - err = client.Call("kv.Clear", clear, ret) - assert.NoError(t, err) - - ret = &payload.Response{} - err = client.Call("kv.Has", dataClear, ret) - assert.NoError(t, err) - assert.Len(t, ret.GetItems(), 0) // should be 5 -} |