summaryrefslogtreecommitdiff
path: root/plugins/kv/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv/plugin.go')
-rw-r--r--plugins/kv/plugin.go208
1 files changed, 208 insertions, 0 deletions
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
new file mode 100644
index 00000000..efe92252
--- /dev/null
+++ b/plugins/kv/plugin.go
@@ -0,0 +1,208 @@
+package kv
+
+import (
+ "fmt"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName string = "kv"
+
+const (
+ // driver is the mandatory field which should present in every storage
+ driver string = "driver"
+
+ memcached string = "memcached"
+ boltdb string = "boltdb"
+ redis string = "redis"
+ memory string = "memory"
+)
+
+// Plugin for the unified storage
+type Plugin struct {
+ log logger.Logger
+ // drivers contains general storage drivers, such as boltdb, memory, memcached, redis.
+ drivers map[string]StorageDriver
+ // storages contains user-defined storages, such as boltdb-north, memcached-us and so on.
+ storages map[string]Storage
+ // KV configuration
+ cfg Config
+ cfgPlugin config.Configurer
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("kv_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg.Data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ p.drivers = make(map[string]StorageDriver, 5)
+ p.storages = make(map[string]Storage, 5)
+ p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+func (p *Plugin) Serve() chan error { //nolint:gocognit
+ errCh := make(chan error, 1)
+ const op = errors.Op("kv_plugin_serve")
+ // key - storage name in the config
+ // value - storage
+ /*
+ For example we can have here 2 storages (but they are not pre-configured)
+ for the boltdb and memcached
+ We should provide here the actual configs for the all requested storages
+ kv:
+ boltdb-south:
+ driver: boltdb
+ dir: "tests/rr-bolt"
+ file: "rr.db"
+ bucket: "rr"
+ permissions: 777
+ ttl: 40s
+
+ boltdb-north:
+ driver: boltdb
+ dir: "tests/rr-bolt"
+ file: "rr.db"
+ bucket: "rr"
+ permissions: 777
+ ttl: 40s
+
+ memcached:
+ driver: memcached
+ addr: [ "localhost:11211" ]
+
+
+ For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
+ when user requests for example boltdb-south, we should provide that particular preconfigured storage
+ */
+ for k, v := range p.cfg.Data {
+ if _, ok := v.(map[string]interface{})[driver]; !ok {
+ errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
+ return errCh
+ }
+
+ // config key for the particular sub-driver kv.memcached
+ configKey := fmt.Sprintf("%s.%s", PluginName, k)
+ // at this point we know, that driver field present in the configuration
+ switch v.(map[string]interface{})[driver] {
+ case memcached:
+ if _, ok := p.drivers[memcached]; !ok {
+ p.log.Warn("no memcached drivers registered", "registered", p.drivers)
+ continue
+ }
+
+ storage, err := p.drivers[memcached].KVProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+
+ case boltdb:
+ if _, ok := p.drivers[boltdb]; !ok {
+ p.log.Warn("no boltdb drivers registered", "registered", p.drivers)
+ continue
+ }
+
+ storage, err := p.drivers[boltdb].KVProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ case memory:
+ if _, ok := p.drivers[memory]; !ok {
+ p.log.Warn("no in-memory drivers registered", "registered", p.drivers)
+ continue
+ }
+
+ storage, err := p.drivers[memory].KVProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ case redis:
+ if _, ok := p.drivers[redis]; !ok {
+ p.log.Warn("no redis drivers registered", "registered", p.drivers)
+ continue
+ }
+
+ // first - try local configuration
+ switch {
+ case p.cfgPlugin.Has(configKey):
+ storage, err := p.drivers[redis].KVProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ case p.cfgPlugin.Has(redis):
+ storage, err := p.drivers[redis].KVProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ continue
+ default:
+ // otherwise - error, no local or global config
+ p.log.Warn("no global or local redis configuration provided", "key", configKey)
+ continue
+ }
+
+ default:
+ p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])))
+ }
+ }
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+// Collects will get all plugins which implement Storage interface
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.GetAllStorageDrivers,
+ }
+}
+
+func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) {
+ // save the storage driver
+ p.drivers[name.Name()] = storage
+}
+
+// RPC returns associated rpc service.
+func (p *Plugin) RPC() interface{} {
+ return &rpc{srv: p, log: p.log, storages: p.storages}
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// Available interface implementation
+func (p *Plugin) Available() {
+}