summaryrefslogtreecommitdiff
path: root/plugins/kv/memory/storage.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv/memory/storage.go')
-rw-r--r--plugins/kv/memory/storage.go263
1 files changed, 263 insertions, 0 deletions
diff --git a/plugins/kv/memory/storage.go b/plugins/kv/memory/storage.go
new file mode 100644
index 00000000..1b6cb580
--- /dev/null
+++ b/plugins/kv/memory/storage.go
@@ -0,0 +1,263 @@
+package memory
+
+import (
+ "context"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "memory"
+
+type Plugin struct {
+ heap *sync.Map
+ stop chan struct{}
+
+ log logger.Logger
+ cfg *Config
+}
+
+func NewInMemoryStorage() kv.Storage {
+ p := &Plugin{
+ heap: &sync.Map{},
+ stop: make(chan struct{}),
+ }
+
+ go p.gc()
+
+ return p
+}
+
+func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("in-memory storage init")
+ s.cfg = &Config{}
+ s.cfg.InitDefaults()
+
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ s.log = log
+ // init in-memory
+ s.heap = &sync.Map{}
+ s.stop = make(chan struct{}, 1)
+ return nil
+}
+
+func (s Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ // start in-memory gc for kv
+ go s.gc()
+
+ return errCh
+}
+
+func (s Plugin) Stop() error {
+ const op = errors.Op("in-memory storage stop")
+ err := s.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+ const op = errors.Op("in-memory storage Has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+ m := make(map[string]bool)
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ if _, ok := s.heap.Load(key); ok {
+ m[key] = true
+ }
+ }
+
+ return m, nil
+}
+
+func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+ const op = errors.Op("in-memory storage Get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ if data, exist := s.heap.Load(key); exist {
+ // here might be a panic
+ // but data only could be a string, see Set function
+ return []byte(data.(kv.Item).Value), nil
+ }
+ return nil, nil
+}
+
+func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("in-memory storage MGet")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, key := range keys {
+ if value, ok := s.heap.Load(key); ok {
+ m[key] = value
+ }
+ }
+
+ return m, nil
+}
+
+func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+ const op = errors.Op("in-memory storage Set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ for _, item := range items {
+ // TTL is set
+ if item.TTL != "" {
+ // check the TTL in the item
+ _, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+ }
+
+ s.heap.Store(item.Key, item)
+ }
+ return nil
+}
+
+// MExpire sets the expiration time to the key
+// If key already has the expiration time, it will be overwritten
+func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+ const op = errors.Op("in-memory storage MExpire")
+ for _, item := range items {
+ if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // if key exist, overwrite it value
+ if pItem, ok := s.heap.Load(item.Key); ok {
+ // check that time is correct
+ _, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ tmp := pItem.(kv.Item)
+ // guess that t is in the future
+ // in memory is just FOR TESTING PURPOSES
+ // LOGIC ISN'T IDEAL
+ s.heap.Store(item.Key, kv.Item{
+ Key: item.Key,
+ Value: tmp.Value,
+ TTL: item.TTL,
+ })
+ }
+ }
+
+ return nil
+}
+
+func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("in-memory storage TTL")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, key := range keys {
+ if item, ok := s.heap.Load(key); ok {
+ m[key] = item.(kv.Item).TTL
+ }
+ }
+ return m, nil
+}
+
+func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+ const op = errors.Op("in-memory storage Delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ for _, key := range keys {
+ s.heap.Delete(key)
+ }
+ return nil
+}
+
+// Close clears the in-memory storage
+func (s Plugin) Close() error {
+ s.heap = &sync.Map{}
+ s.stop <- struct{}{}
+ return nil
+}
+
+// ================================== PRIVATE ======================================
+
+func (s *Plugin) gc() {
+ // TODO check
+ ticker := time.NewTicker(time.Millisecond * 500)
+ for {
+ select {
+ case <-s.stop:
+ ticker.Stop()
+ return
+ case now := <-ticker.C:
+ // check every second
+ s.heap.Range(func(key, value interface{}) bool {
+ v := value.(kv.Item)
+ if v.TTL == "" {
+ return true
+ }
+
+ t, err := time.Parse(time.RFC3339, v.TTL)
+ if err != nil {
+ return false
+ }
+
+ if now.After(t) {
+ s.heap.Delete(key)
+ }
+ return true
+ })
+ }
+ }
+}