diff options
Diffstat (limited to 'plugins')
29 files changed, 1192 insertions, 1788 deletions
diff --git a/plugins/config/interface.go b/plugins/config/interface.go index 8370c0ab..59ad981f 100644 --- a/plugins/config/interface.go +++ b/plugins/config/interface.go @@ -1,7 +1,7 @@ package config type Configurer interface { - // // UnmarshalKey takes a single key and unmarshals it into a Struct. + // UnmarshalKey takes a single key and unmarshals it into a Struct. // // func (h *HttpService) Init(cp config.Configurer) error { // h.config := &HttpConfig{} diff --git a/plugins/config/plugin.go b/plugins/config/plugin.go index 09fd35bb..58647eb8 100755 --- a/plugins/config/plugin.go +++ b/plugins/config/plugin.go @@ -23,7 +23,7 @@ type Viper struct { CommonConfig *General } -// Inits config provider. +// Init config provider. func (v *Viper) Init() error { const op = errors.Op("config_plugin_init") v.viper = viper.New() diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/boltdb/plugin_unit_test.go deleted file mode 100644 index ad3843e7..00000000 --- a/plugins/kv/boltdb/plugin_unit_test.go +++ /dev/null @@ -1,531 +0,0 @@ -package boltdb - -import ( - "os" - "strconv" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/stretchr/testify/assert" - bolt "go.etcd.io/bbolt" - "go.uber.org/zap" -) - -// NewBoltClient instantiate new BOLTDB client -// The parameters are: -// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created -// perm os.FileMode -- file permissions, for example 0777 -// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other -// bucket string -- name of the bucket to use, should be UTF-8 -func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) { - const op = errors.Op("boltdb_plugin_new_bolt_client") - db, err := bolt.Open(path, perm, options) - if err != nil { - return nil, errors.E(op, err) - } - - // bucket should be SET - if bucket == "" { - return nil, errors.E(op, errors.Str("bucket should be set")) - } - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - _, err = tx.CreateBucketIfNotExists([]byte(bucket)) - if err != nil { - return errors.E(op, err) - } - return nil - }) - if err != nil { - return nil, errors.E(op, err) - } - - // if TTL is not set, make it default - if ttl == 0 { - ttl = time.Minute - } - - l, _ := zap.NewDevelopment() - s := &Plugin{ - DB: db, - bucket: []byte(bucket), - stop: make(chan struct{}), - timeout: ttl, - gc: &sync.Map{}, - log: logger.NewZapAdapter(l), - } - - // start the TTL gc - go s.gcPhase() - - return s, nil -} - -func initStorage() kv.Storage { - storage, err := newBoltClient("rr.db", 0777, nil, "rr", time.Second) - if err != nil { - panic(err) - } - return storage -} - -func cleanup(t *testing.T, path string) { - err := os.RemoveAll(path) - if err != nil { - t.Fatal(err) - } -} - -func TestStorage_Has(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) -} - -func TestStorage_Has_Set_Has(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) -} - -func TestConcurrentReadWriteTransactions(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - wg := &sync.WaitGroup{} - wg.Add(3) - - m := &sync.RWMutex{} - // concurrently set the keys - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - // set is writable transaction - // it should stop readable - assert.NoError(t, s.Set(kv.Item{ - Key: "key" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - }, kv.Item{ - Key: "key2" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - })) - m.Unlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.RLock() - v, err = s.Has("key") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - m.RUnlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - err = s.Delete("key" + strconv.Itoa(i)) - assert.NoError(t, err) - m.Unlock() - } - }(s) - - wg.Wait() -} - -func TestStorage_Has_Set_MGet(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestStorage_Has_Set_Get(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world2", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.Get("key") - assert.NoError(t, err) - - if string(res) != "hello world" { - t.Fatal("wrong value by key") - } -} - -func TestStorage_Set_Del_Get(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - // check that keys are present - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) - - assert.NoError(t, s.Delete("key", "key2")) - // check that keys are not present - res, err = s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 0) -} - -func TestStorage_Set_GetM(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestNilAndWrongArgs(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - // check - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - _, err = s.Has("") - assert.Error(t, err) - - _, err = s.Get("") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", "") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", " ") - assert.Error(t, err) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - })) - - assert.Error(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "asdf", - })) - - _, err = s.Has("key") - assert.NoError(t, err) - - assert.Error(t, s.Set(kv.Item{})) - - err = s.Delete("") - assert.Error(t, err) - - err = s.Delete("key", "") - assert.Error(t, err) - - err = s.Delete("key", " ") - assert.Error(t, err) - - err = s.Delete("key") - assert.NoError(t, err) -} - -func TestStorage_MExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - // set timeout to 5 sec - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - i1 := kv.Item{ - Key: "key", - Value: "", - TTL: nowPlusFive, - } - i2 := kv.Item{ - Key: "key2", - Value: "", - TTL: nowPlusFive, - } - assert.NoError(t, s.MExpire(i1, i2)) - - time.Sleep(time.Second * 7) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} - -func TestStorage_SetExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - cleanup(t, "rr.db") - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - // set timeout to 5 sec - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: nowPlusFive, - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: nowPlusFive, - })) - - time.Sleep(time.Second * 2) - m, err := s.TTL("key", "key2") - assert.NoError(t, err) - - // remove a precision 4.02342342 -> 4 - keyTTL, err := strconv.Atoi(m["key"].(string)[0:1]) - if err != nil { - t.Fatal(err) - } - - // remove a precision 4.02342342 -> 4 - key2TTL, err := strconv.Atoi(m["key"].(string)[0:1]) - if err != nil { - t.Fatal(err) - } - - assert.True(t, keyTTL < 5) - assert.True(t, key2TTL < 5) - - time.Sleep(time.Second * 7) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} diff --git a/plugins/kv/config.go b/plugins/kv/config.go new file mode 100644 index 00000000..9ecae644 --- /dev/null +++ b/plugins/kv/config.go @@ -0,0 +1,5 @@ +package kv + +type Config struct { + Data map[string]interface{} `mapstructure:"kv"` +} diff --git a/plugins/kv/doc/kv.drawio b/plugins/kv/doc/kv.drawio new file mode 100644 index 00000000..04470e4a --- /dev/null +++ b/plugins/kv/doc/kv.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-04-22T21:31:28.320Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.5 Safari/537.36" etag="PMNN2QoTRBeugwC1WCGf" version="14.5.1" type="device"><diagram id="2us8W0xnLog_cmX3fgYy" name="Page-1">7V1Zc6O4Fv41rp55sAsQi3lMHKeTmaQ7lXRmpp9uySDbTLDlBjmJ59dficWAJO8GvLmrYyMkAeccfTqLdGiAzujzawAnw0fsIr+hKe5nA9w0NE1TTI1+sZJZXKJqqhGXDALPTcqyghfvP5QUKknp1HNRWKhIMPaJNykWOng8Rg4plMEgwB/Fan3sF686gQMkFLw40BdL//ZcMoxLdbOdO3GHvMEwuTQASnLnI5jWTgrCIXTxR64IdBugE2BM4l+jzw7yGflSwsTtbhecnd9AgMZknQb62+30l3L1+b8/vk0ME0B/YipNI+kmJLP0kZFLKZAc4oAM8QCPod/NSq8DPB27iHWr0KOszgPGE1qo0sJ/ESGzhJ1wSjAtGpKRn5xFnx75J/f7J+uqZSRHN59Jz9HBLD0Yk2D2T1aRHf5M+2AHWbPoKG0XEhiQKyYKtGCMxygtu/V8f97CTWs4PgxDz4kLkyrsEn08JsnzqDo7pqc62MdBRDRgKtBWrajrAL+h3BntxjIVdpGY0Iy6CxmYMgNPAyep9T57/HDIt/G/Duh13bfO+633q5mOBBgMEFnCXXUuZnSEIjxClDC0XYB8SLz34n3AZKQM5vUyWaI/EnHaRLS0OkUrE6efeWmSitaZiIi0nrGjiERNKWXgLFdhgr0xCXM9P7ECWiGZGFTVTkAxmRfaBoddfAPQXtqA/ojvIRPX+cPsIMHgIsGHL8HgICTYAhtKMNdgZwleRsN36E8TQvz5Fz1+enj9ev9NEO6i6H4MPYJeJjBi0gfV74piysmGC1G778hkw3TaqNeXSNdC0XhHAUGfuSKRu3M26EWq2gkuKB+ZtqamKtkwp6il7fY+7bVrxYxtFKqm0lKUuRoVN1WNuZK1QK+Kjp5Q4FGyoUDEmASFMoARcCpDoTxUqftFEbCmqrQriuwkM6p9fFp4UV5WSUsJE1T5giKH1HZFkrLsLnOAfv394cfNdTmgjlTXQNIJ3zYtAM2SQB1oRVA3dQmoaxJQN8sC9VrH5/agDrhBatnbgPqaoL0K+/c4Vo2KQH0r1TA1TeaKnr1cM+Trq5pulW/bqOax2DY5xw+w9YLrp6Xoq9w/qyX6IGejdSWcYm6NeotWq4vnordsIik5n3MNeosh6C2P3cfOVeeue3PSqotl1q266LV6+JWWtgXSc8aoVv0oZUzmRUrtQRVpMpFSFLN7dbvfcW2vq+NUNa7l0mUKA1s+JzzAHvKLMgJ9bzBm7KC0YVPzNRtnngP9q+TEyHPdWBxR6P0He1F/jJeJCkY7N64bxo2EdQsG7jxQmHTWmEfn8ixaMo4Wjnyqb6sAgMLg1+OjzdRNQT8ESss0Cv02Nb2lmMV+cL8fUjnh+bkHTVGv1QtenOStbWb5A1ERjwhSrDrsJg1whlCbD2ivbFBFUEi3BcTTTxTxFpjPGeIZtm0WkWk/kGe3Uq0ltYmNlm2VAHjLhkmOwc9PtCfl+vVlryrqGuE0xuRceT/6lKO6agqvutprqq6lhVKMw5l4yrEu94jtqsQOXBwSrMsMVEU7sBUErRmk5NdMn973dS8osNz8NWULpCKJb4YR065oBVWffEYETM/TXwMyJ2lOZDYbk3R4aY40jOmaPdMoyWy0zTWimGUNvSVRjgKj9op9/baD5HTutQ3dUPZDV9VSDi2SkC5sPKJQgiQ8bLWPL5KweIXcwUYSOOm1VyjEfH0VJMO43EiCVatEbxdJ2GgJ6eHHEHaSbXXXNYC7xRAOyL1wBjGEnSRl0XRb0SoZQSV57t7cvxxY/EBigQssWT9+0F7XCCsvfnA06E4PxPVo2yH+HsfW2uuKakVhcV1Rao+53vtaBplBDTKJNRZ3wirTSn3oFDvpwNCBrgfzLbe91tt7dDa+ICVCfM3ifdDi6HmKpUfziPQbjhhujXvhJO6ph33i9ppUDBk8nN/zS4rcgA6XIO4nJs+FKpQqXkKSrAOCQmZ+3AZBk9Gp0PPZ04vqA4gnWBC0ImG6kCnDn6nzxuY1nlAXKuWLJigYeVSrx+Mw7syyrAtdqFZE/LgTXQlPnh5z97bwYKf5vDyzR2jkQGeI3OY0vKgqgqoyJ8+FMNTAcd1UXTGuC1OLjx3oD3HI5hxV1VS1cFMsNn7q5Dt3HAmQ610ARASQmC4XokTgcRGQhUVN+v9LHkZNYNlf6iOWFMoKF+MckOEQTlDkSyfM27jCsduDztsg8kh+nxLfG6Ok3IXB23fayiOR16+lMA9hukLLR/3oObiotcH+NWSbnKNPQ1ixE3/mD7GTc9hWistAVUWyTEADEucwKG2Jjipf7VuLezg9s7t7OHMIrxHR2T6OXWZEp6rsCbtJjxjSWYUsTjy2ovlu0PuN5S9ia/Oy799jkOMRSF24XqgUxF5yuUP21Z0Fdav27J0FUSv2A54FTUv3Gp4FFXfwMdand0o1g92WFhhqcWnB/Di/tECmPdqlKY9i1PkZOYipHfTJ/OnAG3+hLFPwx7gR7WPoe4P4mYnAG0oY0ihtswVTQcOEHyWxBwCOPbKVH5aEPVpp7NH3oZ0pGaYoGaxoymJk2QusxC1b9HPWY14vmoxmWxQqU2YwGmXJlLgB4CJTxy1T7ZSndclUCoC171XYja78XoX6UwdoMvP8+Alb/5pKTZxZnQBBwvQeijjKGH005r79g9F09H0BCOfHtKw1FdHSNB1NnJWW8INOMOwvgs4wp5aeGpd0g1Md1nU2l8clMdfEOY0aznyzldpHjXUa0wNHWJmgVzs9gKPZEnP4W1u0ZCPrykCILZeSijZBidtt47yezZfvrz/u6Jmb5/u/us9Vj68ygMzis3vWr+eKGR9OAcikelW1QFbr5qHL3r4sP9ihA6AujrjDTzKVS0xXz5totCpTStUrXVttoFdtzqneTiy9hW9pEBqseDGJptSSg0pUF+Qq4/HnoNKWznl0CFoqlxwvocSOKahUrWVxOajMlq0Xuykv6V4qhrKEqU2Wh4pfOne6+qEliVlUrMaciB90HQ9CtYQ9wgxftb99bt/6oXYM+mF6l5LkD6eLfLZshUa1A1R0uT52n792adHVwwP9++OuO2cBY8bf9z/uWCYO5fXb/e19lNx7cfLE2nyy5p4Y1rY4V4bEdajLGFaaTxYc0OunAPdZDrDLlmfXnw9claBExZaWVRFOb2VpaQaXwjDNRVaqHQREx93be+sOhtJBcPTGEFjw6qlcCnLD4BLyJvPmjtZQUwXtFtDs7NMuXibNF1lBOnIxdi5/w8EZsNsCls3xYTd2V2DYikHD11ASr03XSE1H/pVDcJ5ZEWOf6NxOPMyY1sOE4JGEmwRzah6OF1B15i8Ll+3C4ZZQ7UFPEMBRllpZFrotL6mX6F54gjMfQze3bXKDTLzCIjm1RU+9UL6x96tzW/+FJdq8L0Njje8JGtGv397QLF7iF99r9JMQ//cNtMmEkwcd36fqBCcjQKL8t6sUEkMUkmcUTvA4RBsQv1xVfjn0bkB+4Y3UsqikKiE/2Jz8bBsXZsMlA1ZqkQ4fscsmrO7/AQ==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/kv/boltdb/config.go b/plugins/kv/drivers/boltdb/config.go index ebe73c25..ebe73c25 100644 --- a/plugins/kv/boltdb/config.go +++ b/plugins/kv/drivers/boltdb/config.go diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/drivers/boltdb/driver.go index ffcbc85a..b596d4c3 100644 --- a/plugins/kv/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -16,23 +16,15 @@ import ( bolt "go.etcd.io/bbolt" ) -const PluginName = "boltdb" - -// BoltDB K/V storage. -type Plugin struct { +type Driver struct { // db instance DB *bolt.DB // name should be UTF-8 bucket []byte - - // config for RR integration - cfg *Config - - // logger - log logger.Logger - + log logger.Logger + cfg *Config // gc contains key which are contain timeouts - gc *sync.Map + gc sync.Map // default timeout for cache cleanup is 1 minute timeout time.Duration @@ -40,34 +32,38 @@ type Plugin struct { stop chan struct{} } -func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - const op = errors.Op("boltdb_plugin_init") +func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) { + const op = errors.Op("new_boltdb_driver") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) + d := &Driver{ + log: log, + stop: stop, } - err := cfg.UnmarshalKey(PluginName, &s.cfg) + err := cfgPlugin.UnmarshalKey(key, &d.cfg) if err != nil { - return errors.E(op, errors.Disabled, err) + return nil, errors.E(op, err) } - // add default values - s.cfg.InitDefaults() + d.bucket = []byte(d.cfg.Bucket) + d.timeout = time.Duration(d.cfg.Interval) * time.Second + d.gc = sync.Map{} - // set the logger - s.log = log + // add default values + d.cfg.InitDefaults() - db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil) + db, err := bolt.Open(path.Join(d.cfg.Dir, d.cfg.File), os.FileMode(d.cfg.Permissions), nil) if err != nil { - return errors.E(op, err) + return nil, errors.E(op, err) } + d.DB = db + // create bucket if it does not exist // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists([]byte(s.cfg.Bucket)) + _, err = tx.CreateBucketIfNotExists([]byte(d.cfg.Bucket)) if err != nil { return errors.E(op, upOp) } @@ -75,38 +71,17 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { }) if err != nil { - return errors.E(op, err) + return nil, errors.E(op, err) } - s.DB = db - s.bucket = []byte(s.cfg.Bucket) - s.stop = make(chan struct{}) - s.timeout = time.Duration(s.cfg.Interval) * time.Second - s.gc = &sync.Map{} - - return nil -} - -func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - // start the TTL gc - go s.gcPhase() - - return errCh -} + go d.startGCLoop() -func (s *Plugin) Stop() error { - const op = errors.Op("boltdb_plugin_stop") - err := s.Close() - if err != nil { - return errors.E(op, err) - } - return nil + return d, nil } -func (s *Plugin) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("boltdb_plugin_has") - s.log.Debug("boltdb HAS method called", "args", keys) +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("boltdb_driver_has") + d.log.Debug("boltdb HAS method called", "args", keys) if keys == nil { return nil, errors.E(op, errors.NoKeys) } @@ -114,7 +89,7 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) { m := make(map[string]bool, len(keys)) // this is readable transaction - err := s.DB.View(func(tx *bolt.Tx) error { + err := d.DB.View(func(tx *bolt.Tx) error { // Get retrieves the value for a key in the bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket. // The returned value is only valid for the life of the transaction. @@ -123,7 +98,7 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) { if keyTrimmed == "" { return errors.E(op, errors.EmptyKey) } - b := tx.Bucket(s.bucket) + b := tx.Bucket(d.bucket) if b == nil { return errors.E(op, errors.NoSuchBucket) } @@ -138,15 +113,15 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) { return nil, errors.E(op, err) } - s.log.Debug("boltdb HAS method finished") + d.log.Debug("boltdb HAS method finished") return m, nil } // Get retrieves the value for a key in the bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket. // The returned value is only valid for the life of the transaction. -func (s *Plugin) Get(key string) ([]byte, error) { - const op = errors.Op("boltdb_plugin_get") +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("boltdb_driver_get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) if keyTrimmed == "" { @@ -154,8 +129,8 @@ func (s *Plugin) Get(key string) ([]byte, error) { } var val []byte - err := s.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) + err := d.DB.View(func(tx *bolt.Tx) error { + b := tx.Bucket(d.bucket) if b == nil { return errors.E(op, errors.NoSuchBucket) } @@ -185,8 +160,8 @@ func (s *Plugin) Get(key string) ([]byte, error) { return val, nil } -func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("boltdb_plugin_mget") +func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("boltdb_driver_mget") // defense if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -202,8 +177,8 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { m := make(map[string]interface{}, len(keys)) - err := s.DB.View(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) + err := d.DB.View(func(tx *bolt.Tx) error { + b := tx.Bucket(d.bucket) if b == nil { return errors.E(op, errors.NoSuchBucket) } @@ -237,14 +212,14 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { } // Set puts the K/V to the bolt -func (s *Plugin) Set(items ...kv.Item) error { - const op = errors.Op("boltdb_plugin_set") +func (d *Driver) Set(items ...kv.Item) error { + const op = errors.Op("boltdb_driver_set") if items == nil { return errors.E(op, errors.NoKeys) } // start writable transaction - tx, err := s.DB.Begin(true) + tx, err := d.DB.Begin(true) if err != nil { return errors.E(op, err) } @@ -253,12 +228,12 @@ func (s *Plugin) Set(items ...kv.Item) error { if err != nil { errRb := tx.Rollback() if errRb != nil { - s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) + d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) } } }() - b := tx.Bucket(s.bucket) + b := tx.Bucket(d.bucket) // use access by index to avoid copying for i := range items { // performance note: pass a prepared bytes slice with initial cap @@ -290,7 +265,7 @@ func (s *Plugin) Set(items ...kv.Item) error { return errors.E(op, err) } // Store key TTL in the separate map - s.gc.Store(items[i].Key, items[i].TTL) + d.gc.Store(items[i].Key, items[i].TTL) } buf.Reset() @@ -300,8 +275,8 @@ func (s *Plugin) Set(items ...kv.Item) error { } // Delete all keys from DB -func (s *Plugin) Delete(keys ...string) error { - const op = errors.Op("boltdb_plugin_delete") +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("boltdb_driver_delete") if keys == nil { return errors.E(op, errors.NoKeys) } @@ -315,7 +290,7 @@ func (s *Plugin) Delete(keys ...string) error { } // start writable transaction - tx, err := s.DB.Begin(true) + tx, err := d.DB.Begin(true) if err != nil { return errors.E(op, err) } @@ -325,12 +300,12 @@ func (s *Plugin) Delete(keys ...string) error { if err != nil { errRb := tx.Rollback() if errRb != nil { - s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) + d.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) } } }() - b := tx.Bucket(s.bucket) + b := tx.Bucket(d.bucket) if b == nil { return errors.E(op, errors.NoSuchBucket) } @@ -347,8 +322,8 @@ func (s *Plugin) Delete(keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Plugin) MExpire(items ...kv.Item) error { - const op = errors.Op("boltdb_plugin_mexpire") +func (d *Driver) MExpire(items ...kv.Item) error { + const op = errors.Op("boltdb_driver_mexpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) @@ -360,13 +335,13 @@ func (s *Plugin) MExpire(items ...kv.Item) error { return errors.E(op, err) } - s.gc.Store(items[i].Key, items[i].TTL) + d.gc.Store(items[i].Key, items[i].TTL) } return nil } -func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("boltdb_plugin_ttl") +func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("boltdb_driver_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) } @@ -382,7 +357,7 @@ func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { m := make(map[string]interface{}, len(keys)) for i := range keys { - if item, ok := s.gc.Load(keys[i]); ok { + if item, ok := d.gc.Load(keys[i]); ok { // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64 m[keys[i]] = item.(string) } @@ -390,67 +365,56 @@ func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { return m, nil } -// Close the DB connection -func (s *Plugin) Close() error { - // stop the keys GC - s.stop <- struct{}{} - return s.DB.Close() -} - -// RPCService returns associated rpc service. -func (s *Plugin) RPC() interface{} { - return kv.NewRPCServer(s, s.log) -} - -// Name returns plugin name -func (s *Plugin) Name() string { - return PluginName -} - // ========================= PRIVATE ================================= -func (s *Plugin) gcPhase() { - t := time.NewTicker(s.timeout) - defer t.Stop() - for { - select { - case <-t.C: - // calculate current time before loop started to be fair - now := time.Now() - s.gc.Range(func(key, value interface{}) bool { - const op = errors.Op("boltdb_plugin_gc") - k := key.(string) - v, err := time.Parse(time.RFC3339, value.(string)) - if err != nil { - return false - } +func (d *Driver) startGCLoop() { //nolint:gocognit + go func() { + t := time.NewTicker(d.timeout) + defer t.Stop() + for { + select { + case <-t.C: + // calculate current time before loop started to be fair + now := time.Now() + d.gc.Range(func(key, value interface{}) bool { + const op = errors.Op("boltdb_plugin_gc") + k := key.(string) + v, err := time.Parse(time.RFC3339, value.(string)) + if err != nil { + return false + } - if now.After(v) { - // time expired - s.gc.Delete(k) - s.log.Debug("key deleted", "key", k) - err := s.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - err := b.Delete([]byte(k)) + if now.After(v) { + // time expired + d.gc.Delete(k) + d.log.Debug("key deleted", "key", k) + err := d.DB.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + err := b.Delete([]byte(k)) + if err != nil { + return errors.E(op, err) + } + return nil + }) if err != nil { - return errors.E(op, err) + d.log.Error("error during the gc phase of update", "error", err) + // todo this error is ignored, it means, that timer still be active + // to prevent this, we need to invoke t.Stop() + return false } - return nil - }) - if err != nil { - s.log.Error("error during the gc phase of update", "error", err) - // todo this error is ignored, it means, that timer still be active - // to prevent this, we need to invoke t.Stop() - return false } + return true + }) + case <-d.stop: + err := d.DB.Close() + if err != nil { + d.log.Error("error") } - return true - }) - case <-s.stop: - return + return + } } - } + }() } diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go new file mode 100644 index 00000000..9d1e0dba --- /dev/null +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -0,0 +1,65 @@ +package boltdb + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const PluginName = "boltdb" + +// Plugin BoltDB K/V storage. +type Plugin struct { + cfgPlugin config.Configurer + // logger + log logger.Logger + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} + + drivers uint +} + +func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) + } + + s.stop = make(chan struct{}) + s.log = log + s.cfgPlugin = cfg + return nil +} + +// Serve is noop here +func (s *Plugin) Serve() chan error { + return make(chan error, 1) +} + +func (s *Plugin) Stop() error { + if s.drivers > 0 { + for i := uint(0); i < s.drivers; i++ { + // send close signal to every driver + s.stop <- struct{}{} + } + } + return nil +} + +func (s *Plugin) Provide(key string) (kv.Storage, error) { + const op = errors.Op("boltdb_plugin_provide") + st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop) + if err != nil { + return nil, errors.E(op, err) + } + + // save driver number to release resources after Stop + s.drivers++ + + return st, nil +} + +// Name returns plugin name +func (s *Plugin) Name() string { + return PluginName +} diff --git a/plugins/kv/memcached/config.go b/plugins/kv/drivers/memcached/config.go index 7aad53b6..7aad53b6 100644 --- a/plugins/kv/memcached/config.go +++ b/plugins/kv/drivers/memcached/config.go diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/drivers/memcached/driver.go index b8392f9e..17b06fa0 100644 --- a/plugins/kv/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -11,68 +11,37 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" ) -const PluginName = "memcached" - -var EmptyItem = kv.Item{} - -type Plugin struct { - // config - cfg *Config - // logger - log logger.Logger - // memcached client +type Driver struct { client *memcache.Client + log logger.Logger + cfg *Config } -// NewMemcachedClient returns a memcache client using the provided server(s) +// NewMemcachedDriver returns a memcache client using the provided server(s) // with equal weight. If a server is listed multiple times, // it gets a proportional amount of weight. -func NewMemcachedClient(url string) kv.Storage { - m := memcache.New(url) - return &Plugin{ - client: m, - } -} +func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) { + const op = errors.Op("new_memcached_driver") -func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - const op = errors.Op("memcached_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) + s := &Driver{ + log: log, } - err := cfg.UnmarshalKey(PluginName, &s.cfg) + + err := cfgPlugin.UnmarshalKey(key, &s.cfg) if err != nil { - return errors.E(op, err) + return nil, errors.E(op, err) } s.cfg.InitDefaults() - s.log = log - return nil -} + m := memcache.New(s.cfg.Addr...) + s.client = m -func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - s.client = memcache.New(s.cfg.Addr...) - return errCh -} - -// Memcached has no stop/close or smt similar to close the connection -func (s *Plugin) Stop() error { - return nil -} - -// RPCService returns associated rpc service. -func (s *Plugin) RPC() interface{} { - return kv.NewRPCServer(s, s.log) -} - -// Name returns plugin user-friendly name -func (s *Plugin) Name() string { - return PluginName + return s, nil } // Has checks the key for existence -func (s *Plugin) Has(keys ...string) (map[string]bool, error) { +func (d *Driver) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("memcached_plugin_has") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -83,7 +52,7 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) { if keyTrimmed == "" { return nil, errors.E(op, errors.EmptyKey) } - exist, err := s.client.Get(keys[i]) + exist, err := d.client.Get(keys[i]) if err != nil { // ErrCacheMiss means that a Get failed because the item wasn't present. @@ -101,14 +70,14 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) { // Get gets the item for the given key. ErrCacheMiss is returned for a // memcache cache miss. The key must be at most 250 bytes in length. -func (s *Plugin) Get(key string) ([]byte, error) { +func (d *Driver) Get(key string) ([]byte, error) { const op = errors.Op("memcached_plugin_get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) if keyTrimmed == "" { return nil, errors.E(op, errors.EmptyKey) } - data, err := s.client.Get(key) + data, err := d.client.Get(key) if err != nil { // ErrCacheMiss means that a Get failed because the item wasn't present. if err == memcache.ErrCacheMiss { @@ -124,9 +93,9 @@ func (s *Plugin) Get(key string) ([]byte, error) { return nil, nil } -// return map with key -- string +// MGet return map with key -- string // and map value as value -- []byte -func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { +func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("memcached_plugin_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -143,7 +112,7 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { m := make(map[string]interface{}, len(keys)) for i := range keys { // Here also MultiGet - data, err := s.client.Get(keys[i]) + data, err := d.client.Get(keys[i]) if err != nil { // ErrCacheMiss means that a Get failed because the item wasn't present. if err == memcache.ErrCacheMiss { @@ -164,7 +133,7 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (s *Plugin) Set(items ...kv.Item) error { +func (d *Driver) Set(items ...kv.Item) error { const op = errors.Op("memcached_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -193,7 +162,7 @@ func (s *Plugin) Set(items ...kv.Item) error { memcachedItem.Expiration = int32(t.Unix()) } - err := s.client.Set(memcachedItem) + err := d.client.Set(memcachedItem) if err != nil { return err } @@ -202,10 +171,10 @@ func (s *Plugin) Set(items ...kv.Item) error { return nil } -// Expiration is the cache expiration time, in seconds: either a relative +// MExpire Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (s *Plugin) MExpire(items ...kv.Item) error { +func (d *Driver) MExpire(items ...kv.Item) error { const op = errors.Op("memcached_plugin_mexpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { @@ -223,7 +192,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error { // into the future at which time the item will expire. Zero means the item has // no expiration time. ErrCacheMiss is returned if the key is not in the cache. // The key must be at most 250 bytes in length. - err = s.client.Touch(items[i].Key, int32(t.Unix())) + err = d.client.Touch(items[i].Key, int32(t.Unix())) if err != nil { return errors.E(op, err) } @@ -231,13 +200,13 @@ func (s *Plugin) MExpire(items ...kv.Item) error { return nil } -// return time in seconds (int32) for a given keys -func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { +// TTL return time in seconds (int32) for a given keys +func (d *Driver) TTL(_ ...string) (map[string]interface{}, error) { const op = errors.Op("memcached_plugin_ttl") return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) } -func (s *Plugin) Delete(keys ...string) error { +func (d *Driver) Delete(keys ...string) error { const op = errors.Op("memcached_plugin_has") if keys == nil { return errors.E(op, errors.NoKeys) @@ -252,7 +221,7 @@ func (s *Plugin) Delete(keys ...string) error { } for i := range keys { - err := s.client.Delete(keys[i]) + err := d.client.Delete(keys[i]) // ErrCacheMiss means that a Get failed because the item wasn't present. if err != nil { // ErrCacheMiss means that a Get failed because the item wasn't present. @@ -264,7 +233,3 @@ func (s *Plugin) Delete(keys ...string) error { } return nil } - -func (s *Plugin) Close() error { - return nil -} diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go new file mode 100644 index 00000000..af59e91b --- /dev/null +++ b/plugins/kv/drivers/memcached/plugin.go @@ -0,0 +1,44 @@ +package memcached + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const PluginName = "memcached" + +var EmptyItem = kv.Item{} + +type Plugin struct { + // config plugin + cfgPlugin config.Configurer + // logger + log logger.Logger +} + +func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) + } + + s.cfgPlugin = cfg + s.log = log + return nil +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} + +func (s *Plugin) Provide(key string) (kv.Storage, error) { + const op = errors.Op("boltdb_plugin_provide") + st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) + if err != nil { + return nil, errors.E(op, err) + } + + return st, nil +} diff --git a/plugins/kv/memory/config.go b/plugins/kv/drivers/memory/config.go index e51d09c5..e51d09c5 100644 --- a/plugins/kv/memory/config.go +++ b/plugins/kv/drivers/memory/config.go diff --git a/plugins/kv/memory/plugin.go b/plugins/kv/drivers/memory/driver.go index 4201a1c0..1e0d03d4 100644 --- a/plugins/kv/memory/plugin.go +++ b/plugins/kv/drivers/memory/driver.go @@ -11,53 +11,35 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" ) -// PluginName is user friendly name for the plugin -const PluginName = "memory" - -type Plugin struct { - // heap is user map for the key-value pairs +type Driver struct { heap sync.Map + // stop is used to stop keys GC and close boltdb connection stop chan struct{} - - log logger.Logger - cfg *Config + log logger.Logger + cfg *Config } -func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("in_memory_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) +func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) { + const op = errors.Op("new_in_memory_driver") + + d := &Driver{ + stop: stop, + log: log, } - err := cfg.UnmarshalKey(PluginName, &s.cfg) + + err := cfgPlugin.UnmarshalKey(key, &d.cfg) if err != nil { - return errors.E(op, err) + return nil, errors.E(op, err) } - s.cfg.InitDefaults() - s.log = log - - s.stop = make(chan struct{}, 1) - return nil -} + d.cfg.InitDefaults() -func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - // start in-memory gc for kv - go s.gc() + go d.gc() - return errCh + return d, nil } -func (s *Plugin) Stop() error { - const op = errors.Op("in_memory_plugin_stop") - err := s.Close() - if err != nil { - return errors.E(op, err) - } - return nil -} - -func (s *Plugin) Has(keys ...string) (map[string]bool, error) { +func (s *Driver) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("in_memory_plugin_has") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -77,7 +59,7 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) { return m, nil } -func (s *Plugin) Get(key string) ([]byte, error) { +func (s *Driver) Get(key string) ([]byte, error) { const op = errors.Op("in_memory_plugin_get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -93,7 +75,7 @@ func (s *Plugin) Get(key string) ([]byte, error) { return nil, nil } -func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { +func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("in_memory_plugin_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -118,7 +100,7 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { return m, nil } -func (s *Plugin) Set(items ...kv.Item) error { +func (s *Driver) Set(items ...kv.Item) error { const op = errors.Op("in_memory_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -141,7 +123,7 @@ func (s *Plugin) Set(items ...kv.Item) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Plugin) MExpire(items ...kv.Item) error { +func (s *Driver) MExpire(items ...kv.Item) error { const op = errors.Op("in_memory_plugin_mexpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { @@ -170,7 +152,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error { return nil } -func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { +func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { const op = errors.Op("in_memory_plugin_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -194,7 +176,7 @@ func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { return m, nil } -func (s *Plugin) Delete(keys ...string) error { +func (s *Driver) Delete(keys ...string) error { const op = errors.Op("in_memory_plugin_delete") if keys == nil { return errors.E(op, errors.NoKeys) @@ -214,26 +196,9 @@ func (s *Plugin) Delete(keys ...string) error { return nil } -// Close clears the in-memory storage -func (s *Plugin) Close() error { - s.stop <- struct{}{} - return nil -} - -// RPCService returns associated rpc service. -func (s *Plugin) RPC() interface{} { - return kv.NewRPCServer(s, s.log) -} - -// Name returns plugin user-friendly name -func (s *Plugin) Name() string { - return PluginName -} - // ================================== PRIVATE ====================================== -func (s *Plugin) gc() { - // TODO check +func (s *Driver) gc() { ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) for { select { diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go new file mode 100644 index 00000000..acc6023d --- /dev/null +++ b/plugins/kv/drivers/memory/plugin.go @@ -0,0 +1,64 @@ +package memory + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// PluginName is user friendly name for the plugin +const PluginName = "memory" + +type Plugin struct { + // heap is user map for the key-value pairs + stop chan struct{} + + log logger.Logger + cfgPlugin config.Configurer + drivers uint +} + +func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("in_memory_plugin_init") + if !cfg.Has(kv.PluginName) { + return errors.E(op, errors.Disabled) + } + + s.log = log + s.cfgPlugin = cfg + s.stop = make(chan struct{}, 1) + return nil +} + +func (s *Plugin) Serve() chan error { + return make(chan error, 1) +} + +func (s *Plugin) Stop() error { + if s.drivers > 0 { + for i := uint(0); i < s.drivers; i++ { + // send close signal to every driver + s.stop <- struct{}{} + } + } + return nil +} + +func (s *Plugin) Provide(key string) (kv.Storage, error) { + const op = errors.Op("inmemory_plugin_provide") + st, err := NewInMemoryDriver(s.log, key, s.cfgPlugin, s.stop) + if err != nil { + return nil, errors.E(op, err) + } + + // save driver number to release resources after Stop + s.drivers++ + + return st, nil +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} diff --git a/plugins/kv/drivers/redis/config.go b/plugins/kv/drivers/redis/config.go new file mode 100644 index 00000000..41348236 --- /dev/null +++ b/plugins/kv/drivers/redis/config.go @@ -0,0 +1,34 @@ +package redis + +import "time" + +type Config struct { + Addrs []string `mapstructure:"addrs"` + DB int `mapstructure:"db"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + MasterName string `mapstructure:"master_name"` + SentinelPassword string `mapstructure:"sentinel_password"` + RouteByLatency bool `mapstructure:"route_by_latency"` + RouteRandomly bool `mapstructure:"route_randomly"` + MaxRetries int `mapstructure:"max_retries"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` + MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` + PoolSize int `mapstructure:"pool_size"` + MinIdleConns int `mapstructure:"min_idle_conns"` + MaxConnAge time.Duration `mapstructure:"max_conn_age"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` + PoolTimeout time.Duration `mapstructure:"pool_timeout"` + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` + ReadOnly bool `mapstructure:"read_only"` +} + +// InitDefaults initializing fill config with default values +func (s *Config) InitDefaults() { + if s.Addrs == nil { + s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage + } +} diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go new file mode 100644 index 00000000..d0b541b2 --- /dev/null +++ b/plugins/kv/drivers/redis/driver.go @@ -0,0 +1,239 @@ +package redis + +import ( + "context" + "strings" + "time" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +var EmptyItem = kv.Item{} + +type Driver struct { + universalClient redis.UniversalClient + log logger.Logger + cfg *Config +} + +func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) { + const op = errors.Op("new_boltdb_driver") + + d := &Driver{ + log: log, + } + + // will be different for every connected driver + err := cfgPlugin.UnmarshalKey(key, &d.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + d.cfg.InitDefaults() + d.log = log + + d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: d.cfg.Addrs, + DB: d.cfg.DB, + Username: d.cfg.Username, + Password: d.cfg.Password, + SentinelPassword: d.cfg.SentinelPassword, + MaxRetries: d.cfg.MaxRetries, + MinRetryBackoff: d.cfg.MaxRetryBackoff, + MaxRetryBackoff: d.cfg.MaxRetryBackoff, + DialTimeout: d.cfg.DialTimeout, + ReadTimeout: d.cfg.ReadTimeout, + WriteTimeout: d.cfg.WriteTimeout, + PoolSize: d.cfg.PoolSize, + MinIdleConns: d.cfg.MinIdleConns, + MaxConnAge: d.cfg.MaxConnAge, + PoolTimeout: d.cfg.PoolTimeout, + IdleTimeout: d.cfg.IdleTimeout, + IdleCheckFrequency: d.cfg.IdleCheckFreq, + ReadOnly: d.cfg.ReadOnly, + RouteByLatency: d.cfg.RouteByLatency, + RouteRandomly: d.cfg.RouteRandomly, + MasterName: d.cfg.MasterName, + }) + + return d, nil +} + +// Has checks if value exists. +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("redis_driver_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + m := make(map[string]bool, len(keys)) + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + exist, err := d.universalClient.Exists(context.Background(), key).Result() + if err != nil { + return nil, err + } + if exist == 1 { + m[key] = true + } + } + return m, nil +} + +// Get loads key content into slice. +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("redis_driver_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + return d.universalClient.Get(context.Background(), key).Bytes() +} + +// MGet loads content of multiple values (some values might be skipped). +// https://redis.io/commands/mget +// Returns slice with the interfaces with values +func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("redis_driver_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + for _, k := range keys { + cmd := d.universalClient.Get(context.Background(), k) + if cmd.Err() != nil { + if cmd.Err() == redis.Nil { + continue + } + return nil, errors.E(op, cmd.Err()) + } + + m[k] = cmd.Val() + } + + return m, nil +} + +// Set sets value with the TTL in seconds +// https://redis.io/commands/set +// Redis `SET key value [expiration]` command. +// +// Use expiration for `SETEX`-like behavior. +// Zero expiration means the key has no expiration time. +func (d *Driver) Set(items ...kv.Item) error { + const op = errors.Op("redis_driver_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + now := time.Now() + for _, item := range items { + if item == EmptyItem { + return errors.E(op, errors.EmptyKey) + } + + if item.TTL == "" { + err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err() + if err != nil { + return err + } + } else { + t, err := time.Parse(time.RFC3339, item.TTL) + if err != nil { + return err + } + err = d.universalClient.Set(context.Background(), item.Key, item.Value, t.Sub(now)).Err() + if err != nil { + return err + } + } + } + return nil +} + +// Delete one or multiple keys. +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("redis_driver_delete") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + return d.universalClient.Del(context.Background(), keys...).Err() +} + +// MExpire https://redis.io/commands/expire +// timeout in RFC3339 +func (d *Driver) MExpire(items ...kv.Item) error { + const op = errors.Op("redis_driver_mexpire") + now := time.Now() + for _, item := range items { + if item.TTL == "" || strings.TrimSpace(item.Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + t, err := time.Parse(time.RFC3339, item.TTL) + if err != nil { + return err + } + + // t guessed to be in future + // for Redis we use t.Sub, it will result in seconds, like 4.2s + d.universalClient.Expire(context.Background(), item.Key, t.Sub(now)) + } + + return nil +} + +// TTL https://redis.io/commands/ttl +// return time in seconds (float64) for a given keys +func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("redis_driver_ttl") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + for _, key := range keys { + duration, err := d.universalClient.TTL(context.Background(), key).Result() + if err != nil { + return nil, err + } + + m[key] = duration.Seconds() + } + return m, nil +} diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go new file mode 100644 index 00000000..d2183411 --- /dev/null +++ b/plugins/kv/drivers/redis/plugin.go @@ -0,0 +1,51 @@ +package redis + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const PluginName = "redis" + +// Plugin BoltDB K/V storage. +type Plugin struct { + cfgPlugin config.Configurer + // logger + log logger.Logger +} + +func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) + } + + s.log = log + s.cfgPlugin = cfg + return nil +} + +// Serve is noop here +func (s *Plugin) Serve() chan error { + return make(chan error, 1) +} + +func (s *Plugin) Stop() error { + return nil +} + +func (s *Plugin) Provide(key string) (kv.Storage, error) { + const op = errors.Op("redis_plugin_provide") + st, err := NewRedisDriver(s.log, key, s.cfgPlugin) + if err != nil { + return nil, errors.E(op, err) + } + + return st, nil +} + +// Name returns plugin name +func (s *Plugin) Name() string { + return PluginName +} diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index c1367cdf..20dbb8b3 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -35,7 +35,15 @@ type Storage interface { // Delete one or multiple keys. Delete(keys ...string) error +} + +// StorageDriver interface provide storage +type StorageDriver interface { + Provider +} - // Close closes the storage and underlying resources. - Close() error +// Provider provides storage based on the config +type Provider interface { + // Provide provides Storage based on the config key + Provide(key string) (Storage, error) } diff --git a/plugins/kv/memcached/plugin_unit_test.go b/plugins/kv/memcached/plugin_unit_test.go deleted file mode 100644 index 31423627..00000000 --- a/plugins/kv/memcached/plugin_unit_test.go +++ /dev/null @@ -1,432 +0,0 @@ -package memcached - -import ( - "strconv" - "sync" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/stretchr/testify/assert" -) - -func initStorage() kv.Storage { - return NewMemcachedClient("localhost:11211") -} - -func cleanup(t *testing.T, s kv.Storage, keys ...string) { - err := s.Delete(keys...) - if err != nil { - t.Fatalf("error during cleanup: %s", err.Error()) - } -} - -func TestStorage_Has(t *testing.T) { - s := initStorage() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) -} - -func TestStorage_Has_Set_Has(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) -} - -func TestStorage_Has_Set_MGet(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestStorage_Has_Set_Get(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.Get("key") - assert.NoError(t, err) - - if string(res) != "hello world" { - t.Fatal("wrong value by key") - } -} - -func TestStorage_Set_Del_Get(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - // check that keys are present - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) - - assert.NoError(t, s.Delete("key", "key2")) - // check that keys are not present - res, err = s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 0) -} - -func TestStorage_Set_GetM(t *testing.T) { - s := initStorage() - - defer func() { - cleanup(t, s, "key", "key2") - - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestStorage_MExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - // set timeout to 5 sec - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - i1 := kv.Item{ - Key: "key", - Value: "", - TTL: nowPlusFive, - } - i2 := kv.Item{ - Key: "key2", - Value: "", - TTL: nowPlusFive, - } - assert.NoError(t, s.MExpire(i1, i2)) - - time.Sleep(time.Second * 7) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} - -func TestNilAndWrongArgs(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key") - if err := s.Close(); err != nil { - panic(err) - } - }() - - // check - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - _, err = s.Has("") - assert.Error(t, err) - - _, err = s.Get("") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", "") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", " ") - assert.Error(t, err) - - assert.Error(t, s.Set(kv.Item{})) - - err = s.Delete("") - assert.Error(t, err) - - err = s.Delete("key", "") - assert.Error(t, err) - - err = s.Delete("key", " ") - assert.Error(t, err) - - err = s.Delete("key") - assert.NoError(t, err) -} - -func TestStorage_SetExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - // set timeout to 5 sec - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: nowPlusFive, - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: nowPlusFive, - })) - - time.Sleep(time.Second * 7) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} - -func TestConcurrentReadWriteTransactions(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - wg := &sync.WaitGroup{} - wg.Add(3) - - m := &sync.RWMutex{} - // concurrently set the keys - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - // set is writable transaction - // it should stop readable - assert.NoError(t, s.Set(kv.Item{ - Key: "key" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - }, kv.Item{ - Key: "key2" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - })) - m.Unlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.RLock() - v, err = s.Has("key") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - m.RUnlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - err = s.Delete("key" + strconv.Itoa(i)) - assert.NoError(t, err) - m.Unlock() - } - }(s) - - wg.Wait() -} diff --git a/plugins/kv/memory/plugin_unit_test.go b/plugins/kv/memory/plugin_unit_test.go deleted file mode 100644 index 1965a696..00000000 --- a/plugins/kv/memory/plugin_unit_test.go +++ /dev/null @@ -1,472 +0,0 @@ -package memory - -import ( - "strconv" - "sync" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func initStorage() kv.Storage { - p := &Plugin{ - stop: make(chan struct{}), - } - p.cfg = &Config{ - Interval: 1, - } - - l, _ := zap.NewDevelopment() - p.log = logger.NewZapAdapter(l) - - go p.gc() - - return p -} - -func cleanup(t *testing.T, s kv.Storage, keys ...string) { - err := s.Delete(keys...) - if err != nil { - t.Fatalf("error during cleanup: %s", err.Error()) - } -} - -func TestStorage_Has(t *testing.T) { - s := initStorage() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) -} - -func TestStorage_Has_Set_Has(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) -} - -func TestStorage_Has_Set_MGet(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestStorage_Has_Set_Get(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - res, err := s.Get("key") - assert.NoError(t, err) - - if string(res) != "value" { - t.Fatal("wrong value by key") - } -} - -func TestStorage_Set_Del_Get(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - panic(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - // check that keys are present - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) - - assert.NoError(t, s.Delete("key", "key2")) - // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm - res, err = s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 0) -} - -func TestStorage_Set_GetM(t *testing.T) { - s := initStorage() - - defer func() { - cleanup(t, s, "key", "key2") - - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: "", - })) - - res, err := s.MGet("key", "key2") - assert.NoError(t, err) - assert.Len(t, res, 2) -} - -func TestStorage_MExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - // set timeout to 5 sec - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - i1 := kv.Item{ - Key: "key", - Value: "", - TTL: nowPlusFive, - } - i2 := kv.Item{ - Key: "key2", - Value: "", - TTL: nowPlusFive, - } - assert.NoError(t, s.MExpire(i1, i2)) - - time.Sleep(time.Second * 7) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} - -func TestNilAndWrongArgs(t *testing.T) { - s := initStorage() - defer func() { - if err := s.Close(); err != nil { - panic(err) - } - }() - - // check - v, err := s.Has("key") - assert.NoError(t, err) - assert.False(t, v["key"]) - - _, err = s.Has("") - assert.Error(t, err) - - _, err = s.Get("") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.Get(" ") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", "") - assert.Error(t, err) - - _, err = s.MGet("key", "key2", " ") - assert.Error(t, err) - - assert.NoError(t, s.Set(kv.Item{})) - _, err = s.Has("key") - assert.NoError(t, err) - - err = s.Delete("") - assert.Error(t, err) - - err = s.Delete("key", "") - assert.Error(t, err) - - err = s.Delete("key", " ") - assert.Error(t, err) - - err = s.Delete("key") - assert.NoError(t, err) -} - -func TestStorage_SetExpire_TTL(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - // ensure that storage is clean - v, err := s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, - kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - // set timeout to 5 sec - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "value", - TTL: nowPlusFive, - }, - kv.Item{ - Key: "key2", - Value: "value", - TTL: nowPlusFive, - })) - - time.Sleep(time.Second * 2) - m, err := s.TTL("key", "key2") - assert.NoError(t, err) - - // remove a precision 4.02342342 -> 4 - keyTTL, err := strconv.Atoi(m["key"].(string)[0:1]) - if err != nil { - t.Fatal(err) - } - - // remove a precision 4.02342342 -> 4 - key2TTL, err := strconv.Atoi(m["key"].(string)[0:1]) - if err != nil { - t.Fatal(err) - } - - assert.True(t, keyTTL < 5) - assert.True(t, key2TTL < 5) - - time.Sleep(time.Second * 4) - - // ensure that storage is clean - v, err = s.Has("key", "key2") - assert.NoError(t, err) - assert.False(t, v["key"]) - assert.False(t, v["key2"]) -} - -func TestConcurrentReadWriteTransactions(t *testing.T) { - s := initStorage() - defer func() { - cleanup(t, s, "key", "key2") - if err := s.Close(); err != nil { - t.Fatal(err) - } - }() - - v, err := s.Has("key") - assert.NoError(t, err) - // no such key - assert.False(t, v["key"]) - - assert.NoError(t, s.Set(kv.Item{ - Key: "key", - Value: "hello world", - TTL: "", - }, kv.Item{ - Key: "key2", - Value: "hello world", - TTL: "", - })) - - v, err = s.Has("key", "key2") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - assert.True(t, v["key2"]) - - wg := &sync.WaitGroup{} - wg.Add(3) - - m := &sync.RWMutex{} - // concurrently set the keys - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - // set is writable transaction - // it should stop readable - assert.NoError(t, s.Set(kv.Item{ - Key: "key" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - }, kv.Item{ - Key: "key2" + strconv.Itoa(i), - Value: "hello world" + strconv.Itoa(i), - TTL: "", - })) - m.Unlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.RLock() - v, err = s.Has("key") - assert.NoError(t, err) - // no such key - assert.True(t, v["key"]) - m.RUnlock() - } - }(s) - - // should be no errors - go func(s kv.Storage) { - defer wg.Done() - for i := 0; i <= 100; i++ { - m.Lock() - err = s.Delete("key" + strconv.Itoa(i)) - assert.NoError(t, err) - m.Unlock() - } - }(s) - - wg.Wait() -} diff --git a/plugins/kv/payload/generated/Item.go b/plugins/kv/payload/generated/Item.go new file mode 100644 index 00000000..61bd6024 --- /dev/null +++ b/plugins/kv/payload/generated/Item.go @@ -0,0 +1,67 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package generated + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Item struct { + _tab flatbuffers.Table +} + +func GetRootAsItem(buf []byte, offset flatbuffers.UOffsetT) *Item { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Item{} + x.Init(buf, n+offset) + return x +} + +func (rcv *Item) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Item) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Item) Key() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Item) Value() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Item) Timeout() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func ItemStart(builder *flatbuffers.Builder) { + builder.StartObject(3) +} +func ItemAddKey(builder *flatbuffers.Builder, Key flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Key), 0) +} +func ItemAddValue(builder *flatbuffers.Builder, Value flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Value), 0) +} +func ItemAddTimeout(builder *flatbuffers.Builder, Timeout flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(Timeout), 0) +} +func ItemEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/plugins/kv/payload/generated/Payload.go b/plugins/kv/payload/generated/Payload.go new file mode 100644 index 00000000..a2c6cfdb --- /dev/null +++ b/plugins/kv/payload/generated/Payload.go @@ -0,0 +1,71 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package generated + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Payload struct { + _tab flatbuffers.Table +} + +func GetRootAsPayload(buf []byte, offset flatbuffers.UOffsetT) *Payload { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Payload{} + x.Init(buf, n+offset) + return x +} + +func (rcv *Payload) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Payload) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Payload) Storage() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Payload) Items(obj *Item, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Payload) ItemsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func PayloadStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func PayloadAddStorage(builder *flatbuffers.Builder, Storage flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Storage), 0) +} +func PayloadAddItems(builder *flatbuffers.Builder, Items flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Items), 0) +} +func PayloadStartItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func PayloadEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/plugins/kv/payload/payload.fbs b/plugins/kv/payload/payload.fbs new file mode 100644 index 00000000..7e02c1a0 --- /dev/null +++ b/plugins/kv/payload/payload.fbs @@ -0,0 +1,14 @@ +namespace generated; + +table Payload { + Storage:string; + Items:[Item]; +} + +table Item { + Key:string; + Value:string; + Timeout:string; +} + +root_type Payload; diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 751f0d12..4947dbe3 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -1,110 +1,221 @@ package kv import ( + "unsafe" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" "github.com/spiral/roadrunner/v2/plugins/logger" ) // Wrapper for the plugin -type RPCServer struct { +type rpc struct { + // all available storages + storages map[string]Storage // svc is a plugin implementing Storage interface - svc Storage + srv *Plugin // Logger log logger.Logger } -// NewRPCServer construct RPC server for the particular plugin -func NewRPCServer(srv Storage, log logger.Logger) *RPCServer { - return &RPCServer{ - svc: srv, - log: log, +// Has accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Has(in []byte, res *map[string]bool) error { + const op = errors.Op("rpc_has") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + + keys := make([]string, 0, l) + + tmpItem := &generated.Item{} + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, strConvert(tmpItem.Key())) } -} -// data Data -func (r *RPCServer) Has(in []string, res *map[string]bool) error { - const op = errors.Op("rpc server Has") - ret, err := r.svc.Has(in...) - if err != nil { - return errors.E(op, err) + if st, ok := r.storages[strConvert(dataRoot.Storage())]; ok { + ret, err := st.Has(keys...) + if err != nil { + return err + } + + // update the value in the pointer + // save the result + *res = ret + return nil } - // update the value in the pointer - *res = ret - return nil + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in SetData -func (r *RPCServer) Set(in []Item, ok *bool) error { - const op = errors.Op("rpc server Set") +// Set accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Set(in []byte, ok *bool) error { + const op = errors.Op("rpc_set") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() - err := r.svc.Set(in...) - if err != nil { - return errors.E(op, err) + items := make([]Item, 0, dataRoot.ItemsLength()) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + + itc := Item{ + Key: string(tmpItem.Key()), + Value: string(tmpItem.Value()), + TTL: string(tmpItem.Timeout()), + } + + items = append(items, itc) } - *ok = true - return nil -} + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.Set(items...) + if err != nil { + return err + } -// in Data -func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error { - const op = errors.Op("rpc server MGet") - ret, err := r.svc.MGet(in...) - if err != nil { - return errors.E(op, err) + // save the result + *ok = true + return nil } - // update return value - *res = ret - return nil + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in Data -func (r *RPCServer) MExpire(in []Item, ok *bool) error { - const op = errors.Op("rpc server MExpire") +// MGet accept []byte flatbuffers payload with Storage and Item +func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_mget") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} - err := r.svc.MExpire(in...) - if err != nil { - return errors.E(op, err) + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) } - *ok = true - return nil + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.MGet(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in Data -func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error { - const op = errors.Op("rpc server TTL") +// MExpire accept []byte flatbuffers payload with Storage and Item +func (r *rpc) MExpire(in []byte, ok *bool) error { + const op = errors.Op("rpc_mexpire") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + + // when unmarshalling the keys, simultaneously, fill up the slice with items + items := make([]Item, 0, l) + tmpItem := &generated.Item{} + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + + itc := Item{ + Key: string(tmpItem.Key()), + // we set up timeout on the keys, so, value here is redundant + Value: "", + TTL: string(tmpItem.Timeout()), + } + + items = append(items, itc) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.MExpire(items...) + if err != nil { + return errors.E(op, err) + } - ret, err := r.svc.TTL(in...) - if err != nil { - return errors.E(op, err) + // save the result + *ok = true + return nil } - *res = ret - return nil + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in Data -func (r *RPCServer) Delete(in []string, ok *bool) error { - const op = errors.Op("rpc server Delete") - err := r.svc.Delete(in...) - if err != nil { - return errors.E(op, err) +// TTL accept []byte flatbuffers payload with Storage and Item +func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_ttl") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) } - *ok = true - return nil + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.TTL(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in string, storages -func (r *RPCServer) Close(storage string, ok *bool) error { - const op = errors.Op("rpc server Close") - err := r.svc.Close() - if err != nil { - return errors.E(op, err) +// Delete accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Delete(in []byte, ok *bool) error { + const op = errors.Op("rcp_delete") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.Delete(keys...) + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil } - *ok = true - return nil + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +func strConvert(s []byte) string { + return *(*string)(unsafe.Pointer(&s)) } diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go new file mode 100644 index 00000000..a9530f56 --- /dev/null +++ b/plugins/kv/storage.go @@ -0,0 +1,182 @@ +package kv + +import ( + "fmt" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const PluginName string = "kv" + +const ( + // driver is the mandatory field which should present in every storage + driver string = "driver" + + memcached string = "memcached" + boltdb string = "boltdb" + redis string = "redis" + memory string = "memory" +) + +// Plugin for the unified storage +type Plugin struct { + log logger.Logger + // drivers contains general storage drivers, such as boltdb, memory, memcached, redis. + drivers map[string]StorageDriver + // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. + storages map[string]Storage + // KV configuration + cfg Config + cfgPlugin config.Configurer +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("kv_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) + if err != nil { + return errors.E(op, err) + } + p.drivers = make(map[string]StorageDriver, 5) + p.storages = make(map[string]Storage, 5) + p.log = log + p.cfgPlugin = cfg + return nil +} + +func (p *Plugin) Serve() chan error { + errCh := make(chan error, 1) + const op = errors.Op("kv_plugin_serve") + // key - storage name in the config + // value - storage + /* + For example we can have here 2 storages (but they are not pre-configured) + for the boltdb and memcached + We should provide here the actual configs for the all requested storages + kv: + boltdb-south: + driver: boltdb + dir: "tests/rr-bolt" + file: "rr.db" + bucket: "rr" + permissions: 777 + ttl: 40s + + boltdb-north: + driver: boltdb + dir: "tests/rr-bolt" + file: "rr.db" + bucket: "rr" + permissions: 777 + ttl: 40s + + memcached: + driver: memcached + addr: [ "localhost:11211" ] + + + For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached + when user requests for example boltdb-south, we should provide that particular preconfigured storage + */ + for k, v := range p.cfg.Data { + if _, ok := v.(map[string]interface{})[driver]; !ok { + errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) + return errCh + } + + // config key for the particular sub-driver + configKey := fmt.Sprintf("%s.%s", PluginName, k) + // at this point we know, that driver field present in the cofiguration + switch v.(map[string]interface{})[driver] { + case memcached: + if _, ok := p.drivers[memcached]; !ok { + continue + } + + storage, err := p.drivers[memcached].Provide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + + case boltdb: + if _, ok := p.drivers[boltdb]; !ok { + continue + } + + storage, err := p.drivers[boltdb].Provide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + case memory: + if _, ok := p.drivers[memory]; !ok { + continue + } + + storage, err := p.drivers[memory].Provide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + case redis: + if _, ok := p.drivers[redis]; !ok { + continue + } + + storage, err := p.drivers[redis].Provide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + + default: + p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))) + } + } + + return errCh +} + +func (p *Plugin) Stop() error { + return nil +} + +// Collects will get all plugins which implement Storage interface +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.GetAllStorageDrivers, + } +} + +func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) { + // save the storage driver + p.drivers[name.Name()] = storage +} + +// RPC returns associated rpc service. +func (p *Plugin) RPC() interface{} { + return &rpc{srv: p, log: p.log, storages: p.storages} +} + +func (p *Plugin) Name() string { + return PluginName +} diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go index 909c8ca4..c0be6137 100644 --- a/plugins/redis/interface.go +++ b/plugins/redis/interface.go @@ -4,6 +4,6 @@ import "github.com/go-redis/redis/v8" // Redis in the redis KV plugin interface type Redis interface { - // GetClient + // GetClient provides universal redis client GetClient() redis.UniversalClient } diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go index ee0deda6..43382e56 100644 --- a/plugins/resetter/plugin.go +++ b/plugins/resetter/plugin.go @@ -74,7 +74,7 @@ func (p *Plugin) Name() string { return PluginName } -// RPCService returns associated rpc service. +// RPC returns associated rpc service. func (p *Plugin) RPC() interface{} { return &rpc{srv: p, log: p.log} } diff --git a/plugins/rpc/interface.go b/plugins/rpc/interface.go index 683fd2ec..eb6da9af 100644 --- a/plugins/rpc/interface.go +++ b/plugins/rpc/interface.go @@ -2,6 +2,6 @@ package rpc // RPCer declares the ability to create set of public RPC methods. type RPCer interface { - // Provides RPC methods for the given service. + // RPC Provides methods for the given service. RPC() interface{} } diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index 94fec0b6..b80994d3 100644 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -13,7 +13,7 @@ import ( ) // PluginName contains default plugin name. -const PluginName = "RPC" +const PluginName = "rpc" // Plugin is RPC service. type Plugin struct { @@ -23,7 +23,7 @@ type Plugin struct { // set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC plugins map[string]RPCer listener net.Listener - closed *uint32 + closed uint32 } // Init rpc service. Must return true if service is enabled. @@ -40,13 +40,12 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { // Init defaults s.cfg.InitDefaults() // Init pluggable plugins map - s.plugins = make(map[string]RPCer) + s.plugins = make(map[string]RPCer, 5) // init logs s.log = log + // set up state - state := uint32(0) - s.closed = &state - atomic.StoreUint32(s.closed, 0) + atomic.StoreUint32(&s.closed, 0) // validate config err = s.cfg.Valid() @@ -79,7 +78,7 @@ func (s *Plugin) Serve() chan error { var err error s.listener, err = s.cfg.Listener() if err != nil { - errCh <- err + errCh <- errors.E(op, err) return errCh } @@ -89,7 +88,7 @@ func (s *Plugin) Serve() chan error { for { conn, err := s.listener.Accept() if err != nil { - if atomic.LoadUint32(s.closed) == 1 { + if atomic.LoadUint32(&s.closed) == 1 { // just continue, this is not a critical issue, we just called Stop return } @@ -110,7 +109,7 @@ func (s *Plugin) Serve() chan error { func (s *Plugin) Stop() error { const op = errors.Op("rpc_plugin_stop") // store closed state - atomic.StoreUint32(s.closed, 1) + atomic.StoreUint32(&s.closed, 1) err := s.listener.Close() if err != nil { return errors.E(op, err) @@ -123,7 +122,7 @@ func (s *Plugin) Name() string { return PluginName } -// Depends declares services to collect for RPC. +// Collects all plugins which implement Name + RPCer interfaces func (s *Plugin) Collects() []interface{} { return []interface{}{ s.RegisterPlugin, @@ -150,13 +149,3 @@ func (s *Plugin) Register(name string, svc interface{}) error { return s.rpc.RegisterName(name, svc) } - -// Client creates new RPC client. -func (s *Plugin) Client() (*rpc.Client, error) { - conn, err := s.cfg.Dialer() - if err != nil { - return nil, err - } - - return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil -} |