diff options
Diffstat (limited to 'plugins/kv/drivers/memory/plugin.go')
-rw-r--r-- | plugins/kv/drivers/memory/plugin.go | 229 |
1 files changed, 13 insertions, 216 deletions
diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go index 91fe89d3..acc6023d 100644 --- a/plugins/kv/drivers/memory/plugin.go +++ b/plugins/kv/drivers/memory/plugin.go @@ -1,10 +1,6 @@ package memory import ( - "strings" - "sync" - "time" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" @@ -16,12 +12,11 @@ const PluginName = "memory" type Plugin struct { // heap is user map for the key-value pairs - heap sync.Map stop chan struct{} log logger.Logger - cfg *Config cfgPlugin config.Configurer + drivers uint } func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { @@ -37,231 +32,33 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { } func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) - return errCh + return make(chan error, 1) } func (s *Plugin) Stop() error { - const op = errors.Op("in_memory_plugin_stop") - err := s.Close() - if err != nil { - return errors.E(op, err) + if s.drivers > 0 { + for i := uint(0); i < s.drivers; i++ { + // send close signal to every driver + s.stop <- struct{}{} + } } return nil } -func (s *Plugin) Configure(key string) (kv.Storage, error) { - const op = errors.Op("inmemory_plugin_configure") - err := s.cfgPlugin.UnmarshalKey(key, &s.cfg) +func (s *Plugin) Provide(key string) (kv.Storage, error) { + const op = errors.Op("inmemory_plugin_provide") + st, err := NewInMemoryDriver(s.log, key, s.cfgPlugin, s.stop) if err != nil { return nil, errors.E(op, err) } - // initialize default keys - s.cfg.InitDefaults() + // save driver number to release resources after Stop + s.drivers++ - // start in-memory gc for kv - go s.gc() - - return s, nil -} - -func (s *Plugin) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("in_memory_plugin_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - m := make(map[string]bool) - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if _, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = true - } - } - - return m, nil -} - -func (s *Plugin) Get(key string) ([]byte, error) { - const op = errors.Op("in_memory_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if data, exist := s.heap.Load(key); exist { - // here might be a panic - // but data only could be a string, see Set function - return []byte(data.(kv.Item).Value), nil - } - return nil, nil -} - -func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("in_memory_plugin_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]interface{}, len(keys)) - - for i := range keys { - if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(kv.Item).Value - } - } - - return m, nil -} - -func (s *Plugin) Set(items ...kv.Item) error { - const op = errors.Op("in_memory_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - for i := range items { - // TTL is set - if items[i].TTL != "" { - // check the TTL in the item - _, err := time.Parse(time.RFC3339, items[i].TTL) - if err != nil { - return err - } - } - - s.heap.Store(items[i].Key, items[i]) - } - return nil -} - -// MExpire sets the expiration time to the key -// If key already has the expiration time, it will be overwritten -func (s *Plugin) MExpire(items ...kv.Item) error { - const op = errors.Op("in_memory_plugin_mexpire") - for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // if key exist, overwrite it value - if pItem, ok := s.heap.Load(items[i].Key); ok { - // check that time is correct - _, err := time.Parse(time.RFC3339, items[i].TTL) - if err != nil { - return errors.E(op, err) - } - tmp := pItem.(kv.Item) - // guess that t is in the future - // in memory is just FOR TESTING PURPOSES - // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, kv.Item{ - Key: items[i].Key, - Value: tmp.Value, - TTL: items[i].TTL, - }) - } - } - - return nil -} - -func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("in_memory_plugin_ttl") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]interface{}, len(keys)) - - for i := range keys { - if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(kv.Item).TTL - } - } - return m, nil -} - -func (s *Plugin) Delete(keys ...string) error { - const op = errors.Op("in_memory_plugin_delete") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - for i := range keys { - s.heap.Delete(keys[i]) - } - return nil -} - -// Close clears the in-memory storage -func (s *Plugin) Close() error { - s.stop <- struct{}{} - return nil + return st, nil } // Name returns plugin user-friendly name func (s *Plugin) Name() string { return PluginName } - -// ================================== PRIVATE ====================================== - -func (s *Plugin) gc() { - ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) - for { - select { - case <-s.stop: - ticker.Stop() - return - case now := <-ticker.C: - // check every second - s.heap.Range(func(key, value interface{}) bool { - v := value.(kv.Item) - if v.TTL == "" { - return true - } - - t, err := time.Parse(time.RFC3339, v.TTL) - if err != nil { - return false - } - - if now.After(t) { - s.log.Debug("key deleted", "key", key) - s.heap.Delete(key) - } - return true - }) - } - } -} |