diff options
Diffstat (limited to 'plugins/kv')
-rw-r--r-- | plugins/kv/config.go | 6 | ||||
-rw-r--r-- | plugins/kv/doc/kv.drawio | 1 | ||||
-rw-r--r-- | plugins/kv/plugin.go | 159 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 180 |
4 files changed, 0 insertions, 346 deletions
diff --git a/plugins/kv/config.go b/plugins/kv/config.go deleted file mode 100644 index 09ba79cd..00000000 --- a/plugins/kv/config.go +++ /dev/null @@ -1,6 +0,0 @@ -package kv - -// Config represents general storage configuration with keys as the user defined kv-names and values as the constructors -type Config struct { - Data map[string]interface{} `mapstructure:"kv"` -} diff --git a/plugins/kv/doc/kv.drawio b/plugins/kv/doc/kv.drawio deleted file mode 100644 index 04470e4a..00000000 --- a/plugins/kv/doc/kv.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2021-04-22T21:31:28.320Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.5 Safari/537.36" etag="PMNN2QoTRBeugwC1WCGf" version="14.5.1" type="device"><diagram id="2us8W0xnLog_cmX3fgYy" name="Page-1">7V1Zc6O4Fv41rp55sAsQi3lMHKeTmaQ7lXRmpp9uySDbTLDlBjmJ59dficWAJO8GvLmrYyMkAeccfTqLdGiAzujzawAnw0fsIr+hKe5nA9w0NE1TTI1+sZJZXKJqqhGXDALPTcqyghfvP5QUKknp1HNRWKhIMPaJNykWOng8Rg4plMEgwB/Fan3sF686gQMkFLw40BdL//ZcMoxLdbOdO3GHvMEwuTQASnLnI5jWTgrCIXTxR64IdBugE2BM4l+jzw7yGflSwsTtbhecnd9AgMZknQb62+30l3L1+b8/vk0ME0B/YipNI+kmJLP0kZFLKZAc4oAM8QCPod/NSq8DPB27iHWr0KOszgPGE1qo0sJ/ESGzhJ1wSjAtGpKRn5xFnx75J/f7J+uqZSRHN59Jz9HBLD0Yk2D2T1aRHf5M+2AHWbPoKG0XEhiQKyYKtGCMxygtu/V8f97CTWs4PgxDz4kLkyrsEn08JsnzqDo7pqc62MdBRDRgKtBWrajrAL+h3BntxjIVdpGY0Iy6CxmYMgNPAyep9T57/HDIt/G/Duh13bfO+633q5mOBBgMEFnCXXUuZnSEIjxClDC0XYB8SLz34n3AZKQM5vUyWaI/EnHaRLS0OkUrE6efeWmSitaZiIi0nrGjiERNKWXgLFdhgr0xCXM9P7ECWiGZGFTVTkAxmRfaBoddfAPQXtqA/ojvIRPX+cPsIMHgIsGHL8HgICTYAhtKMNdgZwleRsN36E8TQvz5Fz1+enj9ev9NEO6i6H4MPYJeJjBi0gfV74piysmGC1G778hkw3TaqNeXSNdC0XhHAUGfuSKRu3M26EWq2gkuKB+ZtqamKtkwp6il7fY+7bVrxYxtFKqm0lKUuRoVN1WNuZK1QK+Kjp5Q4FGyoUDEmASFMoARcCpDoTxUqftFEbCmqrQriuwkM6p9fFp4UV5WSUsJE1T5giKH1HZFkrLsLnOAfv394cfNdTmgjlTXQNIJ3zYtAM2SQB1oRVA3dQmoaxJQN8sC9VrH5/agDrhBatnbgPqaoL0K+/c4Vo2KQH0r1TA1TeaKnr1cM+Trq5pulW/bqOax2DY5xw+w9YLrp6Xoq9w/qyX6IGejdSWcYm6NeotWq4vnordsIik5n3MNeosh6C2P3cfOVeeue3PSqotl1q266LV6+JWWtgXSc8aoVv0oZUzmRUrtQRVpMpFSFLN7dbvfcW2vq+NUNa7l0mUKA1s+JzzAHvKLMgJ9bzBm7KC0YVPzNRtnngP9q+TEyHPdWBxR6P0He1F/jJeJCkY7N64bxo2EdQsG7jxQmHTWmEfn8ixaMo4Wjnyqb6sAgMLg1+OjzdRNQT8ESss0Cv02Nb2lmMV+cL8fUjnh+bkHTVGv1QtenOStbWb5A1ERjwhSrDrsJg1whlCbD2ivbFBFUEi3BcTTTxTxFpjPGeIZtm0WkWk/kGe3Uq0ltYmNlm2VAHjLhkmOwc9PtCfl+vVlryrqGuE0xuRceT/6lKO6agqvutprqq6lhVKMw5l4yrEu94jtqsQOXBwSrMsMVEU7sBUErRmk5NdMn973dS8osNz8NWULpCKJb4YR065oBVWffEYETM/TXwMyJ2lOZDYbk3R4aY40jOmaPdMoyWy0zTWimGUNvSVRjgKj9op9/baD5HTutQ3dUPZDV9VSDi2SkC5sPKJQgiQ8bLWPL5KweIXcwUYSOOm1VyjEfH0VJMO43EiCVatEbxdJ2GgJ6eHHEHaSbXXXNYC7xRAOyL1wBjGEnSRl0XRb0SoZQSV57t7cvxxY/EBigQssWT9+0F7XCCsvfnA06E4PxPVo2yH+HsfW2uuKakVhcV1Rao+53vtaBplBDTKJNRZ3wirTSn3oFDvpwNCBrgfzLbe91tt7dDa+ICVCfM3ifdDi6HmKpUfziPQbjhhujXvhJO6ph33i9ppUDBk8nN/zS4rcgA6XIO4nJs+FKpQqXkKSrAOCQmZ+3AZBk9Gp0PPZ04vqA4gnWBC0ImG6kCnDn6nzxuY1nlAXKuWLJigYeVSrx+Mw7syyrAtdqFZE/LgTXQlPnh5z97bwYKf5vDyzR2jkQGeI3OY0vKgqgqoyJ8+FMNTAcd1UXTGuC1OLjx3oD3HI5hxV1VS1cFMsNn7q5Dt3HAmQ610ARASQmC4XokTgcRGQhUVN+v9LHkZNYNlf6iOWFMoKF+MckOEQTlDkSyfM27jCsduDztsg8kh+nxLfG6Ok3IXB23fayiOR16+lMA9hukLLR/3oObiotcH+NWSbnKNPQ1ixE3/mD7GTc9hWistAVUWyTEADEucwKG2Jjipf7VuLezg9s7t7OHMIrxHR2T6OXWZEp6rsCbtJjxjSWYUsTjy2ovlu0PuN5S9ia/Oy799jkOMRSF24XqgUxF5yuUP21Z0Fdav27J0FUSv2A54FTUv3Gp4FFXfwMdand0o1g92WFhhqcWnB/Di/tECmPdqlKY9i1PkZOYipHfTJ/OnAG3+hLFPwx7gR7WPoe4P4mYnAG0oY0ihtswVTQcOEHyWxBwCOPbKVH5aEPVpp7NH3oZ0pGaYoGaxoymJk2QusxC1b9HPWY14vmoxmWxQqU2YwGmXJlLgB4CJTxy1T7ZSndclUCoC171XYja78XoX6UwdoMvP8+Alb/5pKTZxZnQBBwvQeijjKGH005r79g9F09H0BCOfHtKw1FdHSNB1NnJWW8INOMOwvgs4wp5aeGpd0g1Md1nU2l8clMdfEOY0aznyzldpHjXUa0wNHWJmgVzs9gKPZEnP4W1u0ZCPrykCILZeSijZBidtt47yezZfvrz/u6Jmb5/u/us9Vj68ygMzis3vWr+eKGR9OAcikelW1QFbr5qHL3r4sP9ihA6AujrjDTzKVS0xXz5totCpTStUrXVttoFdtzqneTiy9hW9pEBqseDGJptSSg0pUF+Qq4/HnoNKWznl0CFoqlxwvocSOKahUrWVxOajMlq0Xuykv6V4qhrKEqU2Wh4pfOne6+qEliVlUrMaciB90HQ9CtYQ9wgxftb99bt/6oXYM+mF6l5LkD6eLfLZshUa1A1R0uT52n792adHVwwP9++OuO2cBY8bf9z/uWCYO5fXb/e19lNx7cfLE2nyy5p4Y1rY4V4bEdajLGFaaTxYc0OunAPdZDrDLlmfXnw9claBExZaWVRFOb2VpaQaXwjDNRVaqHQREx93be+sOhtJBcPTGEFjw6qlcCnLD4BLyJvPmjtZQUwXtFtDs7NMuXibNF1lBOnIxdi5/w8EZsNsCls3xYTd2V2DYikHD11ASr03XSE1H/pVDcJ5ZEWOf6NxOPMyY1sOE4JGEmwRzah6OF1B15i8Ll+3C4ZZQ7UFPEMBRllpZFrotL6mX6F54gjMfQze3bXKDTLzCIjm1RU+9UL6x96tzW/+FJdq8L0Njje8JGtGv397QLF7iF99r9JMQ//cNtMmEkwcd36fqBCcjQKL8t6sUEkMUkmcUTvA4RBsQv1xVfjn0bkB+4Y3UsqikKiE/2Jz8bBsXZsMlA1ZqkQ4fscsmrO7/AQ==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go deleted file mode 100644 index 86bd982f..00000000 --- a/plugins/kv/plugin.go +++ /dev/null @@ -1,159 +0,0 @@ -package kv - -import ( - "fmt" - - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/kv" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - // PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync. - PluginName string = "kv" - // driver is the mandatory field which should present in every storage - driver string = "driver" - // config key used to detect local configuration for the driver - cfg string = "config" -) - -// Plugin for the unified storage -type Plugin struct { - log logger.Logger - // constructors contains general storage constructors, such as boltdb, memory, memcached, redis. - constructors map[string]kv.Constructor - // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. - storages map[string]kv.Storage - // KV configuration - cfg Config - cfgPlugin config.Configurer -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("kv_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) - if err != nil { - return errors.E(op, err) - } - p.constructors = make(map[string]kv.Constructor, 5) - p.storages = make(map[string]kv.Storage, 5) - p.log = log - p.cfgPlugin = cfg - return nil -} - -func (p *Plugin) Serve() chan error { - errCh := make(chan error, 1) - const op = errors.Op("kv_plugin_serve") - // key - storage name in the config - // value - storage - // For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached - // when user requests for example boltdb-south, we should provide that particular preconfigured storage - - for k, v := range p.cfg.Data { - // for example if the key not properly formatted (yaml) - if v == nil { - continue - } - - // check type of the v - // should be a map[string]interface{} - switch t := v.(type) { - // correct type - case map[string]interface{}: - if _, ok := t[driver]; !ok { - errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) - return errCh - } - default: - p.log.Warn("wrong type detected in the configuration, please, check yaml indentation") - continue - } - - // config key for the particular sub-driver kv.memcached.config - configKey := fmt.Sprintf("%s.%s.%s", PluginName, k, cfg) - // at this point we know, that driver field present in the configuration - drName := v.(map[string]interface{})[driver] - - // driver name should be a string - if drStr, ok := drName.(string); ok { - switch { - // local configuration section key - case p.cfgPlugin.Has(configKey): - if _, ok := p.constructors[drStr]; !ok { - p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors) - continue - } - - storage, err := p.constructors[drStr].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - // try global then - case p.cfgPlugin.Has(k): - if _, ok := p.constructors[drStr]; !ok { - p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors) - continue - } - - // use only key for the driver registration, for example rr-boltdb should be globally available - storage, err := p.constructors[drStr].KVConstruct(k) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - default: - p.log.Error("can't find local or global configuration, this section will be skipped", "local: ", configKey, "global: ", k) - continue - } - } - continue - } - - return errCh -} - -func (p *Plugin) Stop() error { - // stop all attached storages - for k := range p.storages { - p.storages[k].Stop() - } - return nil -} - -// Collects will get all plugins which implement Storage interface -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.GetAllStorageDrivers, - } -} - -func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor kv.Constructor) { - // save the storage constructor - p.constructors[name.Name()] = constructor -} - -// RPC returns associated rpc service. -func (p *Plugin) RPC() interface{} { - return &rpc{srv: p, log: p.log, storages: p.storages} -} - -func (p *Plugin) Name() string { - return PluginName -} - -// Available interface implementation -func (p *Plugin) Available() {} diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go deleted file mode 100644 index ad4aefa9..00000000 --- a/plugins/kv/rpc.go +++ /dev/null @@ -1,180 +0,0 @@ -package kv - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/kv" - "github.com/spiral/roadrunner/v2/plugins/logger" - kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" -) - -// Wrapper for the plugin -type rpc struct { - // all available storages - storages map[string]kv.Storage - // svc is a plugin implementing Storage interface - srv *Plugin - // Logger - log logger.Logger -} - -// Has accept []*kvv1.Payload proto payload with Storage and Item -func (r *rpc) Has(in *kvv1.Request, out *kvv1.Response) error { - const op = errors.Op("rpc_has") - - if in.GetStorage() == "" { - return errors.E(op, errors.Str("no storage provided")) - } - - keys := make([]string, 0, len(in.GetItems())) - - for i := 0; i < len(in.GetItems()); i++ { - keys = append(keys, in.Items[i].Key) - } - - if st, ok := r.storages[in.GetStorage()]; ok { - ret, err := st.Has(keys...) - if err != nil { - return errors.E(op, err) - } - - // update the value in the pointer - // save the result - out.Items = make([]*kvv1.Item, 0, len(ret)) - for k := range ret { - out.Items = append(out.Items, &kvv1.Item{ - Key: k, - }) - } - return nil - } - - return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) -} - -// Set accept proto payload with Storage and Item -func (r *rpc) Set(in *kvv1.Request, _ *kvv1.Response) error { - const op = errors.Op("rpc_set") - - if st, exists := r.storages[in.GetStorage()]; exists { - err := st.Set(in.GetItems()...) - if err != nil { - return errors.E(op, err) - } - - // save the result - return nil - } - - return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) -} - -// MGet accept proto payload with Storage and Item -func (r *rpc) MGet(in *kvv1.Request, out *kvv1.Response) error { - const op = errors.Op("rpc_mget") - - keys := make([]string, 0, len(in.GetItems())) - - for i := 0; i < len(in.GetItems()); i++ { - keys = append(keys, in.Items[i].Key) - } - - if st, exists := r.storages[in.GetStorage()]; exists { - ret, err := st.MGet(keys...) - if err != nil { - return errors.E(op, err) - } - - out.Items = make([]*kvv1.Item, 0, len(ret)) - for k := range ret { - out.Items = append(out.Items, &kvv1.Item{ - Key: k, - Value: ret[k], - }) - } - return nil - } - - return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) -} - -// MExpire accept proto payload with Storage and Item -func (r *rpc) MExpire(in *kvv1.Request, _ *kvv1.Response) error { - const op = errors.Op("rpc_mexpire") - - if st, exists := r.storages[in.GetStorage()]; exists { - err := st.MExpire(in.GetItems()...) - if err != nil { - return errors.E(op, err) - } - - return nil - } - - return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) -} - -// TTL accept proto payload with Storage and Item -func (r *rpc) TTL(in *kvv1.Request, out *kvv1.Response) error { - const op = errors.Op("rpc_ttl") - keys := make([]string, 0, len(in.GetItems())) - - for i := 0; i < len(in.GetItems()); i++ { - keys = append(keys, in.Items[i].Key) - } - - if st, exists := r.storages[in.GetStorage()]; exists { - ret, err := st.TTL(keys...) - if err != nil { - return errors.E(op, err) - } - - out.Items = make([]*kvv1.Item, 0, len(ret)) - for k := range ret { - out.Items = append(out.Items, &kvv1.Item{ - Key: k, - Timeout: ret[k], - }) - } - - return nil - } - - return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) -} - -// Delete accept proto payload with Storage and Item -func (r *rpc) Delete(in *kvv1.Request, _ *kvv1.Response) error { - const op = errors.Op("rcp_delete") - - keys := make([]string, 0, len(in.GetItems())) - - for i := 0; i < len(in.GetItems()); i++ { - keys = append(keys, in.Items[i].Key) - } - if st, exists := r.storages[in.GetStorage()]; exists { - err := st.Delete(keys...) - if err != nil { - return errors.E(op, err) - } - - return nil - } - - return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) -} - -// Clear clean the storage -func (r *rpc) Clear(in *kvv1.Request, _ *kvv1.Response) error { - const op = errors.Op("rcp_delete") - - if st, exists := r.storages[in.GetStorage()]; exists { - err := st.Clear() - if err != nil { - return errors.E(op, err) - } - - return nil - } - - return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) -} |