summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/config.go5
-rw-r--r--plugins/http/constants.go2
-rw-r--r--plugins/http/handler.go29
-rw-r--r--plugins/http/plugin.go12
-rw-r--r--plugins/kv/boltdb/config.go24
-rw-r--r--plugins/kv/boltdb/plugin.go452
-rw-r--r--plugins/kv/boltdb/plugin_unit_test.go531
-rw-r--r--plugins/kv/interface.go41
-rw-r--r--plugins/kv/memcached/config.go10
-rw-r--r--plugins/kv/memcached/plugin.go252
-rw-r--r--plugins/kv/memcached/storage_test.go432
-rw-r--r--plugins/kv/memory/config.go15
-rw-r--r--plugins/kv/memory/plugin.go262
-rw-r--r--plugins/kv/memory/storage_test.go473
-rw-r--r--plugins/kv/rpc.go110
-rw-r--r--plugins/reload/plugin.go2
-rw-r--r--plugins/reload/watcher.go4
-rw-r--r--plugins/server/config.go1
-rw-r--r--plugins/server/interface.go1
-rw-r--r--plugins/server/plugin.go3
20 files changed, 2637 insertions, 24 deletions
diff --git a/plugins/http/config.go b/plugins/http/config.go
index 00d2940b..3b670c86 100644
--- a/plugins/http/config.go
+++ b/plugins/http/config.go
@@ -11,8 +11,10 @@ import (
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
)
+// Cidrs is a slice of IPNet addresses
type Cidrs []*net.IPNet
+// IsTrusted checks if the ip address exists in the provided in the config addresses
func (c *Cidrs) IsTrusted(ip string) bool {
if len(*c) == 0 {
return false
@@ -137,7 +139,7 @@ func (c *Config) EnableFCGI() bool {
return c.FCGI.Address != ""
}
-// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+// InitDefaults must populate Config values using given Config source. Must return error if Config is not valid.
func (c *Config) InitDefaults() error {
if c.Pool == nil {
// default pool
@@ -202,6 +204,7 @@ func (c *Config) InitDefaults() error {
return c.Valid()
}
+// ParseCIDRs parse IPNet addresses and return slice of its
func ParseCIDRs(subnets []string) (Cidrs, error) {
c := make(Cidrs, 0, len(subnets))
for _, cidr := range subnets {
diff --git a/plugins/http/constants.go b/plugins/http/constants.go
index 773d1f46..c3d5c589 100644
--- a/plugins/http/constants.go
+++ b/plugins/http/constants.go
@@ -3,4 +3,6 @@ package http
import "net/http"
var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push")
+
+// TrailerHeaderKey http header key
var TrailerHeaderKey = http.CanonicalHeaderKey("trailer")
diff --git a/plugins/http/handler.go b/plugins/http/handler.go
index 15954f96..9c40cdfc 100644
--- a/plugins/http/handler.go
+++ b/plugins/http/handler.go
@@ -23,13 +23,9 @@ const (
EventError
)
+// MB is 1024 bytes
const MB = 1024 * 1024
-type Handle interface {
- AddListener(l events.Listener)
- ServeHTTP(w http.ResponseWriter, r *http.Request)
-}
-
// ErrorEvent represents singular http error event.
type ErrorEvent struct {
// Request contains client request, must not be stored.
@@ -68,7 +64,7 @@ func (e *ResponseEvent) Elapsed() time.Duration {
// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
// parsed files and query, payload will include parsed form dataTree (if any).
-type handler struct {
+type Handler struct {
maxRequestSize uint64
uploads UploadsConfig
trusted Cidrs
@@ -78,11 +74,12 @@ type handler struct {
lsn events.Listener
}
-func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) {
+// NewHandler return handle interface implementation
+func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (*Handler, error) {
if pool == nil {
return nil, errors.E(errors.Str("pool should be initialized"))
}
- return &handler{
+ return &Handler{
maxRequestSize: maxReqSize * MB,
uploads: uploads,
pool: pool,
@@ -90,8 +87,8 @@ func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool po
}, nil
}
-// Listen attaches handler event controller.
-func (h *handler) AddListener(l events.Listener) {
+// AddListener attaches handler event controller.
+func (h *Handler) AddListener(l events.Listener) {
h.mul.Lock()
defer h.mul.Unlock()
@@ -99,7 +96,7 @@ func (h *handler) AddListener(l events.Listener) {
}
// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
-func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
const op = errors.Op("ServeHTTP")
start := time.Now()
@@ -148,7 +145,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
-func (h *handler) maxSize(w http.ResponseWriter, r *http.Request, start time.Time, op errors.Op) error {
+func (h *Handler) maxSize(w http.ResponseWriter, r *http.Request, start time.Time, op errors.Op) error {
if length := r.Header.Get("content-length"); length != "" {
if size, err := strconv.ParseInt(length, 10, 64); err != nil {
h.handleError(w, r, err, start)
@@ -162,7 +159,7 @@ func (h *handler) maxSize(w http.ResponseWriter, r *http.Request, start time.Tim
}
// handleError sends error.
-func (h *handler) handleError(w http.ResponseWriter, r *http.Request, err error, start time.Time) {
+func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error, start time.Time) {
h.mul.Lock()
defer h.mul.Unlock()
// if pipe is broken, there is no sense to write the header
@@ -186,19 +183,19 @@ func (h *handler) handleError(w http.ResponseWriter, r *http.Request, err error,
}
// handleResponse triggers response event.
-func (h *handler) handleResponse(req *Request, resp *Response, start time.Time) {
+func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) {
h.throw(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)})
}
// throw invokes event handler if any.
-func (h *handler) throw(event interface{}) {
+func (h *Handler) throw(event interface{}) {
if h.lsn != nil {
h.lsn(event)
}
}
// get real ip passing multiple proxy
-func (h *handler) resolveIP(r *Request) {
+func (h *Handler) resolveIP(r *Request) {
if h.trusted.IsTrusted(r.RemoteAddr) == false {
return
}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 2651f305..e6aba78b 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -44,7 +44,7 @@ type Middleware interface {
type middleware map[string]Middleware
-// Service manages pool, http servers.
+// Plugin manages pool, http servers. The main http plugin structure
type Plugin struct {
sync.RWMutex
@@ -60,7 +60,7 @@ type Plugin struct {
pool pool.Pool
// servers RR handler
- handler Handle
+ handler *Handler
// servers
http *http.Server
@@ -267,15 +267,17 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.RUnlock()
}
-// Server returns associated pool workers
+// Workers returns associated pool workers
func (s *Plugin) Workers() []worker.BaseProcess {
return s.pool.Workers()
}
+// Name returns endure.Named interface implementation
func (s *Plugin) Name() string {
return PluginName
}
+// Reset destroys the old pool and replaces it with new one, waiting for old pool to die
func (s *Plugin) Reset() error {
s.Lock()
defer s.Unlock()
@@ -319,12 +321,14 @@ func (s *Plugin) Reset() error {
return nil
}
+// Collects collecting http middlewares
func (s *Plugin) Collects() []interface{} {
return []interface{}{
s.AddMiddleware,
}
}
+// AddMiddleware is base requirement for the middleware (name and Middleware)
func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) {
s.mdwr[name.Name()] = m
}
@@ -414,7 +418,7 @@ func (s *Plugin) initSSL() *http.Server {
hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X
if hasGCMAsm {
- // If AES-GCM hardware is provided then prioritise AES-GCM
+ // If AES-GCM hardware is provided then priorities AES-GCM
// cipher suites.
topCipherSuites = []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
diff --git a/plugins/kv/boltdb/config.go b/plugins/kv/boltdb/config.go
new file mode 100644
index 00000000..b2e1e636
--- /dev/null
+++ b/plugins/kv/boltdb/config.go
@@ -0,0 +1,24 @@
+package boltdb
+
+type Config struct {
+ // Dir is a directory to store the DB files
+ Dir string
+ // File is boltDB file. No need to create it by your own,
+ // boltdb driver is able to create the file, or read existing
+ File string
+ // Bucket to store data in boltDB
+ Bucket string
+ // db file permissions
+ Permissions int
+ // timeout
+ Interval uint `yaml:"interval"`
+}
+
+// InitDefaults initializes default values for the boltdb
+func (s *Config) InitDefaults() {
+ s.Dir = "." // current dir
+ s.Bucket = "rr" // default bucket name
+ s.File = "rr.db" // default file name
+ s.Permissions = 0777 // free for all
+ s.Interval = 60 // default is 60 seconds timeout
+}
diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/boltdb/plugin.go
new file mode 100644
index 00000000..6cfc49f6
--- /dev/null
+++ b/plugins/kv/boltdb/plugin.go
@@ -0,0 +1,452 @@
+package boltdb
+
+import (
+ "bytes"
+ "encoding/gob"
+ "os"
+ "path"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ bolt "go.etcd.io/bbolt"
+)
+
+const PluginName = "boltdb"
+
+// BoltDB K/V storage.
+type Plugin struct {
+ // db instance
+ DB *bolt.DB
+ // name should be UTF-8
+ bucket []byte
+
+ // config for RR integration
+ cfg *Config
+
+ // logger
+ log logger.Logger
+
+ // gc contains key which are contain timeouts
+ gc *sync.Map
+ // default timeout for cache cleanup is 1 minute
+ timeout time.Duration
+
+ // stop is used to stop keys GC and close boltdb connection
+ stop chan struct{}
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ const op = errors.Op("boltdb plugin init")
+ s.cfg = &Config{}
+
+ s.cfg.InitDefaults()
+
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ // set the logger
+ s.log = log
+
+ db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb Update")
+ _, err = tx.CreateBucketIfNotExists([]byte(s.cfg.Bucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ return nil
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ s.DB = db
+ s.bucket = []byte(s.cfg.Bucket)
+ s.stop = make(chan struct{})
+ s.timeout = time.Duration(s.cfg.Interval) * time.Second
+ s.gc = &sync.Map{}
+
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ // start the TTL gc
+ go s.gcPhase()
+
+ return errCh
+}
+
+func (s *Plugin) Stop() error {
+ const op = errors.Op("boltdb stop")
+ err := s.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("boltdb Has")
+ s.log.Debug("boltdb HAS method called", "args", keys)
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ m := make(map[string]bool, len(keys))
+
+ // this is readable transaction
+ err := s.DB.View(func(tx *bolt.Tx) error {
+ // Get retrieves the value for a key in the bucket.
+ // Returns a nil value if the key does not exist or if the key is a nested bucket.
+ // The returned value is only valid for the life of the transaction.
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ b := tx.Bucket(s.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ exist := b.Get([]byte(keys[i]))
+ if exist != nil {
+ m[keys[i]] = true
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ s.log.Debug("boltdb HAS method finished")
+ return m, nil
+}
+
+// Get retrieves the value for a key in the bucket.
+// Returns a nil value if the key does not exist or if the key is a nested bucket.
+// The returned value is only valid for the life of the transaction.
+func (s *Plugin) Get(key string) ([]byte, error) {
+ const op = errors.Op("boltdb Get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ var val []byte
+ err := s.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(s.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ val = b.Get([]byte(key))
+
+ // try to decode values
+ if val != nil {
+ buf := bytes.NewReader(val)
+ decoder := gob.NewDecoder(buf)
+
+ var i string
+ err := decoder.Decode(&i)
+ if err != nil {
+ // unsafe (w/o runes) convert
+ return errors.E(op, err)
+ }
+
+ // set the value
+ val = []byte(i)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return val, nil
+}
+
+func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("boltdb MGet")
+ // defence
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ err := s.DB.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket(s.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+
+ buf := new(bytes.Buffer)
+ var out string
+ buf.Grow(100)
+ for i := range keys {
+ value := b.Get([]byte(keys[i]))
+ buf.Write(value)
+ // allocate enough space
+ dec := gob.NewDecoder(buf)
+ if value != nil {
+ err := dec.Decode(&out)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ m[keys[i]] = out
+ buf.Reset()
+ out = ""
+ }
+ }
+
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return m, nil
+}
+
+// Set puts the K/V to the bolt
+func (s *Plugin) Set(items ...kv.Item) error {
+ const op = errors.Op("boltdb Set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // start writable transaction
+ tx, err := s.DB.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ defer func() {
+ err = tx.Commit()
+ if err != nil {
+ errRb := tx.Rollback()
+ if errRb != nil {
+ s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ }
+ }
+ }()
+
+ b := tx.Bucket(s.bucket)
+ // use access by index to avoid copying
+ for i := range items {
+ // performance note: pass a prepared bytes slice with initial cap
+ // we can't move buf and gob out of loop, because we need to clear both from data
+ // but gob will contain (w/o re-init) the past data
+ buf := bytes.Buffer{}
+ encoder := gob.NewEncoder(&buf)
+ if errors.Is(errors.EmptyItem, err) {
+ return errors.E(op, errors.EmptyItem)
+ }
+
+ // Encode value
+ err = encoder.Encode(&items[i].Value)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ // buf.Bytes will copy the underlying slice. Take a look in case of performance problems
+ err = b.Put([]byte(items[i].Key), buf.Bytes())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
+ // we do not need mutex here, since we use sync.Map
+ if items[i].TTL != "" {
+ // check correctness of provided TTL
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ // Store key TTL in the separate map
+ s.gc.Store(items[i].Key, items[i].TTL)
+ }
+
+ buf.Reset()
+ }
+
+ return nil
+}
+
+// Delete all keys from DB
+func (s *Plugin) Delete(keys ...string) error {
+ const op = errors.Op("boltdb Delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ // start writable transaction
+ tx, err := s.DB.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ defer func() {
+ err = tx.Commit()
+ if err != nil {
+ errRb := tx.Rollback()
+ if errRb != nil {
+ s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb)
+ }
+ }
+ }()
+
+ b := tx.Bucket(s.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+
+ for _, key := range keys {
+ err = b.Delete([]byte(key))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
+
+// MExpire sets the expiration time to the key
+// If key already has the expiration time, it will be overwritten
+func (s *Plugin) MExpire(items ...kv.Item) error {
+ const op = errors.Op("boltdb MExpire")
+ for i := range items {
+ if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // verify provided TTL
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ s.gc.Store(items[i].Key, items[i].TTL)
+ }
+ return nil
+}
+
+func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("boltdb TTL")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for i := range keys {
+ if item, ok := s.gc.Load(keys[i]); ok {
+ // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64
+ m[keys[i]] = item.(string)
+ }
+ }
+ return m, nil
+}
+
+// Close the DB connection
+func (s *Plugin) Close() error {
+ // stop the keys GC
+ s.stop <- struct{}{}
+ return s.DB.Close()
+}
+
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
+// ========================= PRIVATE =================================
+
+func (s *Plugin) gcPhase() {
+ t := time.NewTicker(s.timeout)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ // calculate current time before loop started to be fair
+ now := time.Now()
+ s.gc.Range(func(key, value interface{}) bool {
+ const op = errors.Op("gcPhase")
+ k := key.(string)
+ v, err := time.Parse(time.RFC3339, value.(string))
+ if err != nil {
+ return false
+ }
+
+ if now.After(v) {
+ // time expired
+ s.gc.Delete(k)
+ s.log.Debug("key deleted", "key", k)
+ err := s.DB.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket(s.bucket)
+ if b == nil {
+ return errors.E(op, errors.NoSuchBucket)
+ }
+ err := b.Delete([]byte(k))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ })
+ if err != nil {
+ s.log.Error("error during the gc phase of update", "error", err)
+ // todo this error is ignored, it means, that timer still be active
+ // to prevent this, we need to invoke t.Stop()
+ return false
+ }
+ }
+ return true
+ })
+ case <-s.stop:
+ return
+ }
+ }
+}
diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/boltdb/plugin_unit_test.go
new file mode 100644
index 00000000..2459e493
--- /dev/null
+++ b/plugins/kv/boltdb/plugin_unit_test.go
@@ -0,0 +1,531 @@
+package boltdb
+
+import (
+ "os"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/stretchr/testify/assert"
+ bolt "go.etcd.io/bbolt"
+ "go.uber.org/zap"
+)
+
+// NewBoltClient instantiate new BOLTDB client
+// The parameters are:
+// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created
+// perm os.FileMode -- file permissions, for example 0777
+// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other
+// bucket string -- name of the bucket to use, should be UTF-8
+func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) {
+ const op = errors.Op("newBoltClient")
+ db, err := bolt.Open(path, perm, options)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // bucket should be SET
+ if bucket == "" {
+ return nil, errors.E(op, errors.Str("bucket should be set"))
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ _, err = tx.CreateBucketIfNotExists([]byte(bucket))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // if TTL is not set, make it default
+ if ttl == 0 {
+ ttl = time.Minute
+ }
+
+ l, _ := zap.NewDevelopment()
+ s := &Plugin{
+ DB: db,
+ bucket: []byte(bucket),
+ stop: make(chan struct{}),
+ timeout: ttl,
+ gc: &sync.Map{},
+ log: logger.NewZapAdapter(l),
+ }
+
+ // start the TTL gc
+ go s.gcPhase()
+
+ return s, nil
+}
+
+func initStorage() kv.Storage {
+ storage, err := newBoltClient("rr.db", 0777, nil, "rr", time.Second)
+ if err != nil {
+ panic(err)
+ }
+ return storage
+}
+
+func cleanup(t *testing.T, path string) {
+ err := os.RemoveAll(path)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestStorage_Has(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+}
+
+func TestStorage_Has_Set_Has(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+}
+
+func TestConcurrentReadWriteTransactions(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ wg := &sync.WaitGroup{}
+ wg.Add(3)
+
+ m := &sync.RWMutex{}
+ // concurrently set the keys
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ // set is writable transaction
+ // it should stop readable
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }, kv.Item{
+ Key: "key2" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }))
+ m.Unlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.RLock()
+ v, err = s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ m.RUnlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ err = s.Delete("key" + strconv.Itoa(i))
+ assert.NoError(t, err)
+ m.Unlock()
+ }
+ }(s)
+
+ wg.Wait()
+}
+
+func TestStorage_Has_Set_MGet(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_Has_Set_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world2",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.Get("key")
+ assert.NoError(t, err)
+
+ if string(res) != "hello world" {
+ t.Fatal("wrong value by key")
+ }
+}
+
+func TestStorage_Set_Del_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ // check that keys are present
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+
+ assert.NoError(t, s.Delete("key", "key2"))
+ // check that keys are not present
+ res, err = s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 0)
+}
+
+func TestStorage_Set_GetM(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestNilAndWrongArgs(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ // check
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ _, err = s.Has("")
+ assert.Error(t, err)
+
+ _, err = s.Get("")
+ assert.Error(t, err)
+
+ _, err = s.Get(" ")
+ assert.Error(t, err)
+
+ _, err = s.Get(" ")
+ assert.Error(t, err)
+
+ _, err = s.MGet("key", "key2", "")
+ assert.Error(t, err)
+
+ _, err = s.MGet("key", "key2", " ")
+ assert.Error(t, err)
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ assert.Error(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "asdf",
+ }))
+
+ _, err = s.Has("key")
+ assert.NoError(t, err)
+
+ assert.Error(t, s.Set(kv.Item{}))
+
+ err = s.Delete("")
+ assert.Error(t, err)
+
+ err = s.Delete("key", "")
+ assert.Error(t, err)
+
+ err = s.Delete("key", " ")
+ assert.Error(t, err)
+
+ err = s.Delete("key")
+ assert.NoError(t, err)
+}
+
+func TestStorage_MExpire_TTL(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+ // set timeout to 5 sec
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ i1 := kv.Item{
+ Key: "key",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ i2 := kv.Item{
+ Key: "key2",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ assert.NoError(t, s.MExpire(i1, i2))
+
+ time.Sleep(time.Second * 6)
+
+ // ensure that storage is clean
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestStorage_SetExpire_TTL(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ cleanup(t, "rr.db")
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ // set timeout to 5 sec
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: nowPlusFive,
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: nowPlusFive,
+ }))
+
+ time.Sleep(time.Second * 2)
+ m, err := s.TTL("key", "key2")
+ assert.NoError(t, err)
+
+ // remove a precision 4.02342342 -> 4
+ keyTTL, err := strconv.Atoi(m["key"].(string)[0:1])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // remove a precision 4.02342342 -> 4
+ key2TTL, err := strconv.Atoi(m["key"].(string)[0:1])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.True(t, keyTTL < 5)
+ assert.True(t, key2TTL < 5)
+
+ time.Sleep(time.Second * 4)
+
+ // ensure that storage is clean
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
new file mode 100644
index 00000000..c1367cdf
--- /dev/null
+++ b/plugins/kv/interface.go
@@ -0,0 +1,41 @@
+package kv
+
+// Item represents general storage item
+type Item struct {
+ // Key of item
+ Key string
+ // Value of item
+ Value string
+ // live until time provided by TTL in RFC 3339 format
+ TTL string
+}
+
+// Storage represents single abstract storage.
+type Storage interface {
+ // Has checks if value exists.
+ Has(keys ...string) (map[string]bool, error)
+
+ // Get loads value content into a byte slice.
+ Get(key string) ([]byte, error)
+
+ // MGet loads content of multiple values
+ // Returns the map with existing keys and associated values
+ MGet(keys ...string) (map[string]interface{}, error)
+
+ // Set used to upload item to KV with TTL
+ // 0 value in TTL means no TTL
+ Set(items ...Item) error
+
+ // MExpire sets the TTL for multiply keys
+ MExpire(items ...Item) error
+
+ // TTL return the rest time to live for provided keys
+ // Not supported for the memcached and boltdb
+ TTL(keys ...string) (map[string]interface{}, error)
+
+ // Delete one or multiple keys.
+ Delete(keys ...string) error
+
+ // Close closes the storage and underlying resources.
+ Close() error
+}
diff --git a/plugins/kv/memcached/config.go b/plugins/kv/memcached/config.go
new file mode 100644
index 00000000..62f29ef2
--- /dev/null
+++ b/plugins/kv/memcached/config.go
@@ -0,0 +1,10 @@
+package memcached
+
+type Config struct {
+ // Addr is url for memcached, 11211 port is used by default
+ Addr []string
+}
+
+func (s *Config) InitDefaults() {
+ s.Addr = []string{"localhost:11211"} // default url for memcached // init logger
+}
diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go
new file mode 100644
index 00000000..f5111c04
--- /dev/null
+++ b/plugins/kv/memcached/plugin.go
@@ -0,0 +1,252 @@
+package memcached
+
+import (
+ "strings"
+ "time"
+
+ "github.com/bradfitz/gomemcache/memcache"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "memcached"
+
+var EmptyItem = kv.Item{}
+
+type Plugin struct {
+ // config
+ cfg *Config
+ // logger
+ log logger.Logger
+ // memcached client
+ client *memcache.Client
+}
+
+// NewMemcachedClient returns a memcache client using the provided server(s)
+// with equal weight. If a server is listed multiple times,
+// it gets a proportional amount of weight.
+func NewMemcachedClient(url string) kv.Storage {
+ m := memcache.New(url)
+ return &Plugin{
+ client: m,
+ }
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ const op = errors.Op("memcached init")
+ s.cfg = &Config{}
+ s.cfg.InitDefaults()
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ s.log = log
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ s.client = memcache.New(s.cfg.Addr...)
+ return errCh
+}
+
+// Memcached has no stop/close or smt similar to close the connection
+func (s *Plugin) Stop() error {
+ return nil
+}
+
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
+// Has checks the key for existence
+func (s Plugin) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("memcached Has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+ m := make(map[string]bool, len(keys))
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ exist, err := s.client.Get(keys[i])
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil && err != memcache.ErrCacheMiss {
+ return nil, err
+ }
+ if exist != nil {
+ m[keys[i]] = true
+ }
+ }
+ return m, nil
+}
+
+// Get gets the item for the given key. ErrCacheMiss is returned for a
+// memcache cache miss. The key must be at most 250 bytes in length.
+func (s Plugin) Get(key string) ([]byte, error) {
+ const op = errors.Op("memcached Get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ data, err := s.client.Get(key)
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil && err != memcache.ErrCacheMiss {
+ return nil, err
+ }
+ if data != nil {
+ // return the value by the key
+ return data.Value, nil
+ }
+ // data is nil by some reason and error also nil
+ return nil, nil
+}
+
+// return map with key -- string
+// and map value as value -- []byte
+func (s Plugin) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("memcached MGet")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+ for i := range keys {
+ // Here also MultiGet
+ data, err := s.client.Get(keys[i])
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil && err != memcache.ErrCacheMiss {
+ return nil, err
+ }
+ if data != nil {
+ m[keys[i]] = data.Value
+ }
+ }
+
+ return m, nil
+}
+
+// Set sets the KV pairs. Keys should be 250 bytes maximum
+// TTL:
+// Expiration is the cache expiration time, in seconds: either a relative
+// time from now (up to 1 month), or an absolute Unix epoch time.
+// Zero means the Item has no expiration time.
+func (s Plugin) Set(items ...kv.Item) error {
+ const op = errors.Op("memcached Set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ for i := range items {
+ if items[i] == EmptyItem {
+ return errors.E(op, errors.EmptyItem)
+ }
+
+ // pre-allocate item
+ memcachedItem := &memcache.Item{
+ Key: items[i].Key,
+ // unsafe convert
+ Value: []byte(items[i].Value),
+ Flags: 0,
+ }
+
+ // add additional TTL in case of TTL isn't empty
+ if items[i].TTL != "" {
+ // verify the TTL
+ t, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return err
+ }
+ memcachedItem.Expiration = int32(t.Unix())
+ }
+
+ err := s.client.Set(memcachedItem)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// Expiration is the cache expiration time, in seconds: either a relative
+// time from now (up to 1 month), or an absolute Unix epoch time.
+// Zero means the Item has no expiration time.
+func (s Plugin) MExpire(items ...kv.Item) error {
+ const op = errors.Op("memcached MExpire")
+ for i := range items {
+ if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // verify provided TTL
+ t, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return err
+ }
+
+ // Touch updates the expiry for the given key. The seconds parameter is either
+ // a Unix timestamp or, if seconds is less than 1 month, the number of seconds
+ // into the future at which time the item will expire. Zero means the item has
+ // no expiration time. ErrCacheMiss is returned if the key is not in the cache.
+ // The key must be at most 250 bytes in length.
+ err = s.client.Touch(items[i].Key, int32(t.Unix()))
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// return time in seconds (int32) for a given keys
+func (s Plugin) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("memcached HTTLas")
+ return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
+}
+
+func (s Plugin) Delete(keys ...string) error {
+ const op = errors.Op("memcached Has")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ for i := range keys {
+ err := s.client.Delete(keys[i])
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil && err != memcache.ErrCacheMiss {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s Plugin) Close() error {
+ return nil
+}
diff --git a/plugins/kv/memcached/storage_test.go b/plugins/kv/memcached/storage_test.go
new file mode 100644
index 00000000..3d37748b
--- /dev/null
+++ b/plugins/kv/memcached/storage_test.go
@@ -0,0 +1,432 @@
+package memcached
+
+import (
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/stretchr/testify/assert"
+)
+
+func initStorage() kv.Storage {
+ return NewMemcachedClient("localhost:11211")
+}
+
+func cleanup(t *testing.T, s kv.Storage, keys ...string) {
+ err := s.Delete(keys...)
+ if err != nil {
+ t.Fatalf("error during cleanup: %s", err.Error())
+ }
+}
+
+func TestStorage_Has(t *testing.T) {
+ s := initStorage()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+}
+
+func TestStorage_Has_Set_Has(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+}
+
+func TestStorage_Has_Set_MGet(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_Has_Set_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.Get("key")
+ assert.NoError(t, err)
+
+ if string(res) != "hello world" {
+ t.Fatal("wrong value by key")
+ }
+}
+
+func TestStorage_Set_Del_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ // check that keys are present
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+
+ assert.NoError(t, s.Delete("key", "key2"))
+ // check that keys are not present
+ res, err = s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 0)
+}
+
+func TestStorage_Set_GetM(t *testing.T) {
+ s := initStorage()
+
+ defer func() {
+ cleanup(t, s, "key", "key2")
+
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_MExpire_TTL(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+ // set timeout to 5 sec
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ i1 := kv.Item{
+ Key: "key",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ i2 := kv.Item{
+ Key: "key2",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ assert.NoError(t, s.MExpire(i1, i2))
+
+ time.Sleep(time.Second * 6)
+
+ // ensure that storage is clean
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestNilAndWrongArgs(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ // check
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ _, err = s.Has("")
+ assert.Error(t, err)
+
+ _, err = s.Get("")
+ assert.Error(t, err)
+
+ _, err = s.Get(" ")
+ assert.Error(t, err)
+
+ _, err = s.Get(" ")
+ assert.Error(t, err)
+
+ _, err = s.MGet("key", "key2", "")
+ assert.Error(t, err)
+
+ _, err = s.MGet("key", "key2", " ")
+ assert.Error(t, err)
+
+ assert.Error(t, s.Set(kv.Item{}))
+
+ err = s.Delete("")
+ assert.Error(t, err)
+
+ err = s.Delete("key", "")
+ assert.Error(t, err)
+
+ err = s.Delete("key", " ")
+ assert.Error(t, err)
+
+ err = s.Delete("key")
+ assert.NoError(t, err)
+}
+
+func TestStorage_SetExpire_TTL(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ // set timeout to 5 sec
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: nowPlusFive,
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: nowPlusFive,
+ }))
+
+ time.Sleep(time.Second * 6)
+
+ // ensure that storage is clean
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestConcurrentReadWriteTransactions(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ wg := &sync.WaitGroup{}
+ wg.Add(3)
+
+ m := &sync.RWMutex{}
+ // concurrently set the keys
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ // set is writable transaction
+ // it should stop readable
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }, kv.Item{
+ Key: "key2" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }))
+ m.Unlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.RLock()
+ v, err = s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ m.RUnlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ err = s.Delete("key" + strconv.Itoa(i))
+ assert.NoError(t, err)
+ m.Unlock()
+ }
+ }(s)
+
+ wg.Wait()
+}
diff --git a/plugins/kv/memory/config.go b/plugins/kv/memory/config.go
new file mode 100644
index 00000000..0816f734
--- /dev/null
+++ b/plugins/kv/memory/config.go
@@ -0,0 +1,15 @@
+package memory
+
+// Config is default config for the in-memory driver
+type Config struct {
+ // Enabled or disabled (true or false)
+ Enabled bool
+ // Interval for the check
+ Interval int
+}
+
+// InitDefaults by default driver is turned off
+func (c *Config) InitDefaults() {
+ c.Enabled = false
+ c.Interval = 60 // seconds
+}
diff --git a/plugins/kv/memory/plugin.go b/plugins/kv/memory/plugin.go
new file mode 100644
index 00000000..d2d3721b
--- /dev/null
+++ b/plugins/kv/memory/plugin.go
@@ -0,0 +1,262 @@
+package memory
+
+import (
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// PluginName is user friendly name for the plugin
+const PluginName = "memory"
+
+type Plugin struct {
+ // heap is user map for the key-value pairs
+ heap sync.Map
+ stop chan struct{}
+
+ log logger.Logger
+ cfg *Config
+}
+
+func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("in-memory storage init")
+ s.cfg = &Config{}
+ s.cfg.InitDefaults()
+
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ s.log = log
+
+ s.stop = make(chan struct{}, 1)
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ // start in-memory gc for kv
+ go s.gc()
+
+ return errCh
+}
+
+func (s *Plugin) Stop() error {
+ const op = errors.Op("in-memory storage stop")
+ err := s.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("in-memory storage Has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+ m := make(map[string]bool)
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ if _, ok := s.heap.Load(keys[i]); ok {
+ m[keys[i]] = true
+ }
+ }
+
+ return m, nil
+}
+
+func (s *Plugin) Get(key string) ([]byte, error) {
+ const op = errors.Op("in-memory storage Get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ if data, exist := s.heap.Load(key); exist {
+ // here might be a panic
+ // but data only could be a string, see Set function
+ return []byte(data.(kv.Item).Value), nil
+ }
+ return nil, nil
+}
+
+func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("in-memory storage MGet")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for i := range keys {
+ if value, ok := s.heap.Load(keys[i]); ok {
+ m[keys[i]] = value.(kv.Item).Value
+ }
+ }
+
+ return m, nil
+}
+
+func (s *Plugin) Set(items ...kv.Item) error {
+ const op = errors.Op("in-memory storage Set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ for i := range items {
+ // TTL is set
+ if items[i].TTL != "" {
+ // check the TTL in the item
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return err
+ }
+ }
+
+ s.heap.Store(items[i].Key, items[i])
+ }
+ return nil
+}
+
+// MExpire sets the expiration time to the key
+// If key already has the expiration time, it will be overwritten
+func (s *Plugin) MExpire(items ...kv.Item) error {
+ const op = errors.Op("in-memory storage MExpire")
+ for i := range items {
+ if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // if key exist, overwrite it value
+ if pItem, ok := s.heap.Load(items[i].Key); ok {
+ // check that time is correct
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ tmp := pItem.(kv.Item)
+ // guess that t is in the future
+ // in memory is just FOR TESTING PURPOSES
+ // LOGIC ISN'T IDEAL
+ s.heap.Store(items[i].Key, kv.Item{
+ Key: items[i].Key,
+ Value: tmp.Value,
+ TTL: items[i].TTL,
+ })
+ }
+ }
+
+ return nil
+}
+
+func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("in-memory storage TTL")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for i := range keys {
+ if item, ok := s.heap.Load(keys[i]); ok {
+ m[keys[i]] = item.(kv.Item).TTL
+ }
+ }
+ return m, nil
+}
+
+func (s *Plugin) Delete(keys ...string) error {
+ const op = errors.Op("in-memory storage Delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ for i := range keys {
+ s.heap.Delete(keys[i])
+ }
+ return nil
+}
+
+// Close clears the in-memory storage
+func (s *Plugin) Close() error {
+ s.stop <- struct{}{}
+ return nil
+}
+
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
+// ================================== PRIVATE ======================================
+
+func (s *Plugin) gc() {
+ // TODO check
+ ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second)
+ for {
+ select {
+ case <-s.stop:
+ ticker.Stop()
+ return
+ case now := <-ticker.C:
+ // check every second
+ s.heap.Range(func(key, value interface{}) bool {
+ v := value.(kv.Item)
+ if v.TTL == "" {
+ return true
+ }
+
+ t, err := time.Parse(time.RFC3339, v.TTL)
+ if err != nil {
+ return false
+ }
+
+ if now.After(t) {
+ s.log.Debug("key deleted", "key", key)
+ s.heap.Delete(key)
+ }
+ return true
+ })
+ }
+ }
+}
diff --git a/plugins/kv/memory/storage_test.go b/plugins/kv/memory/storage_test.go
new file mode 100644
index 00000000..d3b24860
--- /dev/null
+++ b/plugins/kv/memory/storage_test.go
@@ -0,0 +1,473 @@
+package memory
+
+import (
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/stretchr/testify/assert"
+ "go.uber.org/zap"
+)
+
+func initStorage() kv.Storage {
+ p := &Plugin{
+ stop: make(chan struct{}),
+ }
+ p.cfg = &Config{
+ Enabled: true,
+ Interval: 1,
+ }
+
+ l, _ := zap.NewDevelopment()
+ p.log = logger.NewZapAdapter(l)
+
+ go p.gc()
+
+ return p
+}
+
+func cleanup(t *testing.T, s kv.Storage, keys ...string) {
+ err := s.Delete(keys...)
+ if err != nil {
+ t.Fatalf("error during cleanup: %s", err.Error())
+ }
+}
+
+func TestStorage_Has(t *testing.T) {
+ s := initStorage()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+}
+
+func TestStorage_Has_Set_Has(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+}
+
+func TestStorage_Has_Set_MGet(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_Has_Set_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.Get("key")
+ assert.NoError(t, err)
+
+ if string(res) != "value" {
+ t.Fatal("wrong value by key")
+ }
+}
+
+func TestStorage_Set_Del_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ // check that keys are present
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+
+ assert.NoError(t, s.Delete("key", "key2"))
+ // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm
+ res, err = s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 0)
+}
+
+func TestStorage_Set_GetM(t *testing.T) {
+ s := initStorage()
+
+ defer func() {
+ cleanup(t, s, "key", "key2")
+
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_MExpire_TTL(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+ // set timeout to 5 sec
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ i1 := kv.Item{
+ Key: "key",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ i2 := kv.Item{
+ Key: "key2",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ assert.NoError(t, s.MExpire(i1, i2))
+
+ time.Sleep(time.Second * 6)
+
+ // ensure that storage is clean
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestNilAndWrongArgs(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ // check
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ _, err = s.Has("")
+ assert.Error(t, err)
+
+ _, err = s.Get("")
+ assert.Error(t, err)
+
+ _, err = s.Get(" ")
+ assert.Error(t, err)
+
+ _, err = s.Get(" ")
+ assert.Error(t, err)
+
+ _, err = s.MGet("key", "key2", "")
+ assert.Error(t, err)
+
+ _, err = s.MGet("key", "key2", " ")
+ assert.Error(t, err)
+
+ assert.NoError(t, s.Set(kv.Item{}))
+ _, err = s.Has("key")
+ assert.NoError(t, err)
+
+ err = s.Delete("")
+ assert.Error(t, err)
+
+ err = s.Delete("key", "")
+ assert.Error(t, err)
+
+ err = s.Delete("key", " ")
+ assert.Error(t, err)
+
+ err = s.Delete("key")
+ assert.NoError(t, err)
+}
+
+func TestStorage_SetExpire_TTL(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ // set timeout to 5 sec
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: nowPlusFive,
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: nowPlusFive,
+ }))
+
+ time.Sleep(time.Second * 2)
+ m, err := s.TTL("key", "key2")
+ assert.NoError(t, err)
+
+ // remove a precision 4.02342342 -> 4
+ keyTTL, err := strconv.Atoi(m["key"].(string)[0:1])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // remove a precision 4.02342342 -> 4
+ key2TTL, err := strconv.Atoi(m["key"].(string)[0:1])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.True(t, keyTTL < 5)
+ assert.True(t, key2TTL < 5)
+
+ time.Sleep(time.Second * 4)
+
+ // ensure that storage is clean
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestConcurrentReadWriteTransactions(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ wg := &sync.WaitGroup{}
+ wg.Add(3)
+
+ m := &sync.RWMutex{}
+ // concurrently set the keys
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ // set is writable transaction
+ // it should stop readable
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }, kv.Item{
+ Key: "key2" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }))
+ m.Unlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.RLock()
+ v, err = s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ m.RUnlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ err = s.Delete("key" + strconv.Itoa(i))
+ assert.NoError(t, err)
+ m.Unlock()
+ }
+ }(s)
+
+ wg.Wait()
+}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
new file mode 100644
index 00000000..751f0d12
--- /dev/null
+++ b/plugins/kv/rpc.go
@@ -0,0 +1,110 @@
+package kv
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// Wrapper for the plugin
+type RPCServer struct {
+ // svc is a plugin implementing Storage interface
+ svc Storage
+ // Logger
+ log logger.Logger
+}
+
+// NewRPCServer construct RPC server for the particular plugin
+func NewRPCServer(srv Storage, log logger.Logger) *RPCServer {
+ return &RPCServer{
+ svc: srv,
+ log: log,
+ }
+}
+
+// data Data
+func (r *RPCServer) Has(in []string, res *map[string]bool) error {
+ const op = errors.Op("rpc server Has")
+ ret, err := r.svc.Has(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // update the value in the pointer
+ *res = ret
+ return nil
+}
+
+// in SetData
+func (r *RPCServer) Set(in []Item, ok *bool) error {
+ const op = errors.Op("rpc server Set")
+
+ err := r.svc.Set(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ *ok = true
+ return nil
+}
+
+// in Data
+func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error {
+ const op = errors.Op("rpc server MGet")
+ ret, err := r.svc.MGet(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // update return value
+ *res = ret
+ return nil
+}
+
+// in Data
+func (r *RPCServer) MExpire(in []Item, ok *bool) error {
+ const op = errors.Op("rpc server MExpire")
+
+ err := r.svc.MExpire(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ *ok = true
+ return nil
+}
+
+// in Data
+func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error {
+ const op = errors.Op("rpc server TTL")
+
+ ret, err := r.svc.TTL(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ *res = ret
+ return nil
+}
+
+// in Data
+func (r *RPCServer) Delete(in []string, ok *bool) error {
+ const op = errors.Op("rpc server Delete")
+ err := r.svc.Delete(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+// in string, storages
+func (r *RPCServer) Close(storage string, ok *bool) error {
+ const op = errors.Op("rpc server Close")
+ err := r.svc.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ *ok = true
+
+ return nil
+}
diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go
index 452e03a3..eb1b61b2 100644
--- a/plugins/reload/plugin.go
+++ b/plugins/reload/plugin.go
@@ -57,7 +57,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res resetter.Res
return nil
}
}
- return errors.E(op, errors.Skip)
+ return errors.E(op, errors.SkipFile)
},
Files: make(map[string]os.FileInfo),
Ignored: ignored,
diff --git a/plugins/reload/watcher.go b/plugins/reload/watcher.go
index c232f16f..08c85af9 100644
--- a/plugins/reload/watcher.go
+++ b/plugins/reload/watcher.go
@@ -179,7 +179,7 @@ outer:
// if filename does not contain pattern --> ignore that file
if w.watcherConfigs[serviceName].FilePatterns != nil && w.watcherConfigs[serviceName].FilterHooks != nil {
err = w.watcherConfigs[serviceName].FilterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].FilePatterns)
- if errors.Is(errors.Skip, err) {
+ if errors.Is(errors.SkipFile, err) {
continue outer
}
}
@@ -293,7 +293,7 @@ func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]o
// if filename does not contain pattern --> ignore that file
err = w.watcherConfigs[serviceName].FilterHooks(info.Name(), w.watcherConfigs[serviceName].FilePatterns)
- if errors.Is(errors.Skip, err) {
+ if errors.Is(errors.SkipFile, err) {
return nil
}
diff --git a/plugins/server/config.go b/plugins/server/config.go
index 4bef3c5f..2bf30e70 100644
--- a/plugins/server/config.go
+++ b/plugins/server/config.go
@@ -28,6 +28,7 @@ type Config struct {
RelayTimeout time.Duration
}
+// InitDefaults for the server config
func (cfg *Config) InitDefaults() {
if cfg.Relay == "" {
cfg.Relay = "pipes"
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
index 98345694..a2d8b92b 100644
--- a/plugins/server/interface.go
+++ b/plugins/server/interface.go
@@ -10,6 +10,7 @@ import (
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
)
+// Env variables type alias
type Env map[string]string
// Server creates workers for the application.
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 5d1f26d3..8a843723 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -21,6 +21,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
+// PluginName for the server
const PluginName = "server"
// Plugin manages worker
@@ -53,11 +54,13 @@ func (server *Plugin) Name() string {
return PluginName
}
+// Serve (Start) server plugin (just a mock here to satisfy interface)
func (server *Plugin) Serve() chan error {
errCh := make(chan error, 1)
return errCh
}
+// Stop used to close chosen in config factory
func (server *Plugin) Stop() error {
if server.factory == nil {
return nil