package kv import ( "fmt" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) const PluginName string = "kv" const ( // driver is the mandatory field which should present in every storage driver string = "driver" memcached string = "memcached" boltdb string = "boltdb" redis string = "redis" memory string = "memory" ) // Plugin for the unified storage type Plugin struct { log logger.Logger // drivers contains general storage drivers, such as boltdb, memory, memcached, redis. drivers map[string]Storage // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. storages map[string]Storage // KV configuration cfg Config } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("kv_plugin_init") if !cfg.Has(PluginName) { return errors.E(errors.Disabled) } err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) if err != nil { return errors.E(op, err) } p.drivers = make(map[string]Storage, 5) p.storages = make(map[string]Storage, 5) p.log = log return nil } func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("kv_plugin_serve") // key - storage name in the config // value - storage /* For example we can have here 2 storages (but they are not pre-configured) for the boltdb and memcached We should provide here the actual configs for the all requested storages kv: default: driver: memory boltdb-south: driver: boltdb dir: "tests/rr-bolt" file: "rr.db" bucket: "rr" permissions: 777 ttl: 40s boltdb-north: driver: boltdb dir: "tests/rr-bolt" file: "rr.db" bucket: "rr" permissions: 777 ttl: 40s memcached: driver: memcached addr: [ "localhost:11211" ] For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached when user requests for example boltdb-south, we should provide that particular preconfigured storage */ for k, v := range p.cfg.Data { if _, ok := v.(map[string]interface{})[driver]; !ok { errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) } configKey := fmt.Sprintf("%s.%s", PluginName, k) // at this point we know, that driver field present in the cofiguration switch v.(map[string]interface{})[driver] { case memcached: if _, ok := p.drivers[memcached]; !ok { continue } storage, err := p.drivers[memcached].Configure(configKey) if err != nil { errCh <- errors.E(op, err) return errCh } // save the storage p.storages[k] = storage case boltdb: if _, ok := p.drivers[boltdb]; !ok { continue } storage, err := p.drivers[boltdb].Configure(configKey) if err != nil { errCh <- errors.E(op, err) return errCh } // save the storage p.storages[k] = storage case memory: if _, ok := p.drivers[memory]; !ok { continue } storage, err := p.drivers[memory].Configure(configKey) if err != nil { errCh <- errors.E(op, err) return errCh } // save the storage p.storages[k] = storage case redis: if _, ok := p.drivers[redis]; !ok { continue } storage, err := p.drivers[redis].Configure(configKey) if err != nil { errCh <- errors.E(op, err) return errCh } // save the storage p.storages[k] = storage default: errCh <- errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])) } } return errCh } func (p *Plugin) Stop() error { return nil } // Collects will get all plugins which implement Storage interface func (p *Plugin) Collects() []interface{} { return []interface{}{ p.GetAllStorageDrivers, } } func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage Storage) { // save the storage driver p.drivers[name.Name()] = storage } // RPC returns associated rpc service. func (p *Plugin) RPC() interface{} { return &rpc{srv: p, log: p.log, storages: p.storages} } func (p *Plugin) Name() string { return PluginName }